shellStreamLines function

Stream<String> shellStreamLines(
  1. Stream<List<int>> stream, {
  2. Encoding? encoding,
})

Basic line streaming. Assuming system encoding

Implementation

Stream<String> shellStreamLines(Stream<List<int>> stream,
    {Encoding? encoding}) {
  encoding ??= shellContext.encoding;
  StreamSubscription? subscription;
  List<int>? currentLine;
  const endOfLine = 10;
  const lineFeed = 13;
  late StreamController<String> ctlr;

  // devPrint('listen (paused: $paused)');
  void addCurrentLine() {
    if (subscription?.isPaused ?? false) {
      // Do nothing, current line is discarded
    } else {
      if (currentLine?.isNotEmpty ?? false) {
        try {
          ctlr.add(encoding!.decode(currentLine!));
        } catch (_) {
// Ignore nad encoded line
          print('ignoring: $currentLine');
        }
      }
    }
    currentLine = null;
  }

  ctlr = StreamController<String>(onPause: () {
    if (shellDebug) {
      print('onPause (paused: ${subscription?.isPaused})');
    }
    // Last one
    addCurrentLine();
    subscription?.pause();
  }, onResume: () {
    // devPrint('onResume (paused: $paused)');
    if (subscription?.isPaused ?? false) {
      subscription?.resume();
    }
  }, onListen: () {
    void addToCurrentLine(List<int> data) {
      if (currentLine == null) {
        currentLine = data;
      } else {
        var newCurrentLine = Uint8List(currentLine!.length + data.length);
        newCurrentLine.setAll(0, currentLine!);
        newCurrentLine.setAll(currentLine!.length, data);
        currentLine = newCurrentLine;
      }
    }

    subscription = stream.listen((data) {
      // var _w;
      // print('read $data');
      var paused = subscription?.isPaused ?? false;
      // devPrint('read $data (paused: $paused)');
      if (paused) {
        return;
      }
      // look for \n (10)
      var start = 0;
      for (var i = 0; i < data.length; i++) {
        var byte = data[i];
        if (byte == endOfLine || byte == lineFeed) {
          addToCurrentLine(data.sublist(start, i));
          addCurrentLine();
// Skip it
          start = i + 1;
        }
      }
      // Store last current line
      if (data.length > start) {
        addToCurrentLine(data.sublist(start, data.length));
      }
    }, onDone: () {
      // Last one
      addCurrentLine();
      ctlr.close();
    }, onError: (Object e, StackTrace st) {
      ctlr.addError(e, st);
    });
  }, onCancel: () {
    subscription?.cancel();
  });

  return ctlr.stream;
}