asyncMap<E> method

  1. @override
Stream<E> asyncMap<E>(
  1. FutureOr<E> convert(
    1. SupabaseStreamEvent event
    )
)
override

Creates a new stream with each data event of this stream asynchronously mapped to a new event.

This acts like map, in that convert function is called once per data event, but here convert may be asynchronous and return a Future. If that happens, this stream waits for that future to complete before continuing with further events.

The returned stream is a broadcast stream if this stream is.

Implementation

@override
Stream<E> asyncMap<E>(
    FutureOr<E> Function(SupabaseStreamEvent event) convert) {
  // Copied from [Stream.asyncMap]

  final controller = BehaviorSubject<E>();

  controller.onListen = () {
    StreamSubscription<SupabaseStreamEvent> subscription = listen(null,
        onError: controller.addError, // Avoid Zone error replacement.
        onDone: controller.close);
    FutureOr<void> add(E value) {
      controller.add(value);
    }

    final addError = controller.addError;
    final resume = subscription.resume;
    subscription.onData((SupabaseStreamEvent event) {
      FutureOr<E> newValue;
      try {
        newValue = convert(event);
      } catch (e, s) {
        controller.addError(e, s);
        return;
      }
      if (newValue is Future<E>) {
        subscription.pause();
        newValue.then(add, onError: addError).whenComplete(resume);
      } else {
        controller.add(newValue as dynamic);
      }
    });
    controller.onCancel = subscription.cancel;
    if (!isBroadcast) {
      controller
        ..onPause = subscription.pause
        ..onResume = resume;
    }
  };
  return controller.stream;
}