streamFromCache method
Creates a stream to serve data from begin
to end
, data is read from cache.
Implementation
Stream<List<int>> streamFromCache({int begin = 0, int? end}) {
late StreamController<List<int>> controller;
StreamSubscription? $subscription;
controller = StreamController(onListen: () async {
// print(''Stream[$begin-$end] - onListen');
try {
int $begin = begin;
// int _written = 0;
for (CacheFragment fragment in List.of(fragments)..sort((a, b) => a.begin - b.begin)) {
if ((end != null && fragment.begin >= end) || fragment.end <= $begin) {
continue;
}
Completer subscriptionCompleter = Completer();
int relativeBegin = $begin - fragment.begin;
int relativeEnd = end == null ? fragment.received : math.min(end - fragment.begin, fragment.received);
$subscription = fragment.file?.openRead(relativeBegin, relativeEnd).listen(
(event) {
try {
if (controller.isClosed) {
subscriptionCompleter.complete();
return;
}
controller.add(event);
} catch (e, s) {
if (!subscriptionCompleter.isCompleted) {
subscriptionCompleter.completeError(e, s);
}
}
},
onError: (Object e, StackTrace s) {
if (!subscriptionCompleter.isCompleted) {
subscriptionCompleter.completeError(e, s);
}
},
onDone: () {
if (!subscriptionCompleter.isCompleted) {
subscriptionCompleter.complete();
}
},
);
try {
await subscriptionCompleter.future;
} finally {
try {
await $subscription?.cancel();
} catch (_) {}
}
$begin += relativeEnd - relativeBegin;
// _written += _relativeEnd - _relativeBegin;
if (end != null && $begin >= end) {
break;
}
}
// print('--[$begin-$end] written:$_written');
} catch (e, s) {
if (!controller.isClosed) {
controller.addError(e, s);
}
}
unawaited(controller.close());
}, onPause: () {
// print('Stream[$begin-$end] - onPause');
$subscription?.pause();
}, onResume: () {
// print('Stream[$begin-$end] - onResume');
$subscription?.resume();
}, onCancel: () async {
// print('Stream[$begin-$end] - onCancel');
unawaited(controller.close());
});
return controller.stream;
}