stream method

Stream<V> stream([
  1. dynamic limit = defaultLimit
])

Implementation

Stream<V> stream([limit = defaultLimit]) {
  late StreamController<V> stream;

  void handlePageAndGetNext(V page) {
    if (_cancelled) {
      stream.close();
      return;
    }

    if (stream.isPaused) {
      _bufferedPages.add(page);
    } else {
      stream.add(page);
    }

    // If this is the last page then end the stream
    if (page.isLast) {
      if (!stream.isPaused) {
        stream.close();
      }
      return;
    }

    // Otherwise get the next page
    _getPage(limit, page._next).then(handlePageAndGetNext);
  }

  stream = StreamController<V>(onListen: () {
    Future<V> firstPage;
    if (_bufferedPages.length == 1) {
      firstPage = Future.value(_bufferedPages.removeAt(0));
    } else {
      firstPage = first(limit);
    }
    firstPage.then(handlePageAndGetNext);
  }, onCancel: () {
    _cancelled = true;
    return;
  }, onResume: () {
    _bufferedPages.forEach(stream.add);
    if (_bufferedPages.last.isLast) {
      stream.close();
    }
    _bufferedPages.clear();
  });
  return stream.stream;
}