getMergedStream method

Stream<List<T>> getMergedStream(
  1. PaginationRequest request
)

Gets a merged stream that combines all source streams.

Implementation

Stream<List<T>> getMergedStream(PaginationRequest request) {
  final streams = streamsProvider(request);

  if (streams.isEmpty) {
    return Stream.value([]);
  }

  if (streams.length == 1) {
    return streams.first;
  }

  // Create a stream controller to merge all streams
  late StreamController<List<T>> controller;
  final subscriptions = <StreamSubscription<List<T>>>[];

  controller = StreamController<List<T>>(
    onListen: () {
      for (final stream in streams) {
        final subscription = stream.listen(
          (data) => controller.add(data),
          onError: (error) => controller.addError(error),
        );
        subscriptions.add(subscription);
      }
    },
    onCancel: () async {
      for (final subscription in subscriptions) {
        await subscription.cancel();
      }
    },
  );

  return controller.stream;
}