pipe method

Future<void> pipe(
  1. AwaitableSink<T> sink, {
  2. bool closeOnDrained = true,
})

Implementation

Future<void> pipe(
  AwaitableSink<T> sink, {
  bool closeOnDrained = true,
}) async {
  while (hasMore()) {
    try {
      final itemFuture = pull();
      final item = itemFuture is Future ? await itemFuture : itemFuture;

      if (item is Some) {
        final addFuture = sink.add((item as Some).value);
        if (addFuture is Future) await addFuture;
      }
    } catch (err) {
      await sink.addError(err);
    }
  }

  if (closeOnDrained) await sink.close();
}