streamFromCache method

Stream<List<int>> streamFromCache({
  1. int begin = 0,
  2. int? end,
})

Creates a stream to serve data from begin to end, data is read from cache.

Implementation

Stream<List<int>> streamFromCache({int begin = 0, int? end}) {
  late StreamController<List<int>> controller;
  StreamSubscription? $subscription;
  controller = StreamController(onListen: () async {
    // print(''Stream[$begin-$end] - onListen');
    try {
      int $begin = begin;
      // int _written = 0;
      for (CacheFragment fragment in List.of(fragments)..sort((a, b) => a.begin - b.begin)) {
        if ((end != null && fragment.begin >= end) || fragment.end <= $begin) {
          continue;
        }
        Completer subscriptionCompleter = Completer();
        int relativeBegin = $begin - fragment.begin;
        int relativeEnd = end == null ? fragment.received : math.min(end - fragment.begin, fragment.received);
        $subscription = fragment.file?.openRead(relativeBegin, relativeEnd).listen(
          (event) {
            try {
              if (controller.isClosed) {
                subscriptionCompleter.complete();
                return;
              }
              controller.add(event);
            } catch (e, s) {
              if (!subscriptionCompleter.isCompleted) {
                subscriptionCompleter.completeError(e, s);
              }
            }
          },
          onError: (Object e, StackTrace s) {
            if (!subscriptionCompleter.isCompleted) {
              subscriptionCompleter.completeError(e, s);
            }
          },
          onDone: () {
            if (!subscriptionCompleter.isCompleted) {
              subscriptionCompleter.complete();
            }
          },
        );
        try {
          await subscriptionCompleter.future;
        } finally {
          try {
            await $subscription?.cancel();
          } catch (_) {}
        }
        $begin += relativeEnd - relativeBegin;
        // _written += _relativeEnd - _relativeBegin;
        if (end != null && $begin >= end) {
          break;
        }
      }
      // print('--[$begin-$end] written:$_written');
    } catch (e, s) {
      if (!controller.isClosed) {
        controller.addError(e, s);
      }
    }
    unawaited(controller.close());
  }, onPause: () {
    // print('Stream[$begin-$end] - onPause');
    $subscription?.pause();
  }, onResume: () {
    // print('Stream[$begin-$end] - onResume');
    $subscription?.resume();
  }, onCancel: () async {
    // print('Stream[$begin-$end] - onCancel');
    unawaited(controller.close());
  });
  return controller.stream;
}