asyncMap<E> method
Creates a new stream with each data event of this stream asynchronously mapped to a new event.
This acts like map, in that convert
function is called once per
data event, but here convert
may be asynchronous and return a Future.
If that happens, this stream waits for that future to complete before
continuing with further events.
The returned stream is a broadcast stream if this stream is.
Implementation
@override
Stream<E> asyncMap<E>(
FutureOr<E> Function(SupabaseStreamEvent event) convert) {
// Copied from [Stream.asyncMap]
final controller = BehaviorSubject<E>();
controller.onListen = () {
StreamSubscription<SupabaseStreamEvent> subscription = listen(null,
onError: controller.addError, // Avoid Zone error replacement.
onDone: controller.close);
FutureOr<void> add(E value) {
controller.add(value);
}
final addError = controller.addError;
final resume = subscription.resume;
subscription.onData((SupabaseStreamEvent event) {
FutureOr<E> newValue;
try {
newValue = convert(event);
} catch (e, s) {
controller.addError(e, s);
return;
}
if (newValue is Future<E>) {
subscription.pause();
newValue.then(add, onError: addError).whenComplete(resume);
} else {
controller.add(newValue as dynamic);
}
});
controller.onCancel = subscription.cancel;
if (!isBroadcast) {
controller
..onPause = subscription.pause
..onResume = resume;
}
};
return controller.stream;
}