stream method
Stream<V>
stream(
[ - dynamic limit = defaultLimit
])
Implementation
Stream<V> stream([limit = defaultLimit]) {
late StreamController<V> stream;
void handlePageAndGetNext(V page) {
if (_cancelled) {
stream.close();
return;
}
if (stream.isPaused) {
_bufferedPages.add(page);
} else {
stream.add(page);
}
// If this is the last page then end the stream
if (page.isLast) {
if (!stream.isPaused) {
stream.close();
}
return;
}
// Otherwise get the next page
_getPage(limit, page._next).then(handlePageAndGetNext);
}
stream = StreamController<V>(onListen: () {
Future<V> firstPage;
if (_bufferedPages.length == 1) {
firstPage = Future.value(_bufferedPages.removeAt(0));
} else {
firstPage = first(limit);
}
firstPage.then(handlePageAndGetNext);
}, onCancel: () {
_cancelled = true;
return;
}, onResume: () {
_bufferedPages.forEach(stream.add);
if (_bufferedPages.last.isLast) {
stream.close();
}
_bufferedPages.clear();
});
return stream.stream;
}