copyToSink<T> function
Copies stream into sink.
copyToif specified, it is called instead ofsink.add. The implementation can callclose()if it'd like to stop the reading.
Note: stream errors are forwarded to sink via sink.addError but
the returned Future still completes successfully — callers cannot
observe stream errors via await. If you need to react to errors,
listen for them on sink or catch them upstream.
Implementation
Future copyToSink<T>(Stream<T> stream, EventSink<T> sink,
{bool cancelOnError = true, bool closeSink = true,
void copyTo(T event, void close())?}) {
final c = Completer();
late final StreamSubscription<T> sub;
var done = false;
void setDone() {
if (!done) {
done = true;
if (!c.isCompleted) c.complete();
if (closeSink) InvokeUtil.invokeSafely(sink.close);
//Required for the copyTo-`close()` path; redundant but idempotent elsewhere.
InvokeUtil.invokeSafely(sub.cancel);
}
}
sub = stream.listen(
copyTo == null ? sink.add: (data) => copyTo(data, setDone),
onError: (Object e, StackTrace st) {
if (!done) {
sink.addError(e, st);
if (cancelOnError)
setDone();
}
},
onDone: setDone,
cancelOnError: cancelOnError);
return c.future;
}