broadcastStream method

Stream<T> broadcastStream({
  1. bool waitForListeners = false,
  2. bool stopWhenNoListeners = true,
  3. bool closeReceiverOnDone = false,
  4. bool sync = false,
})

Convert a channel receiver to a broadcast stream for multiple listeners.

Implementation

Stream<T> broadcastStream({
  bool waitForListeners = false,
  bool stopWhenNoListeners = true,
  bool closeReceiverOnDone = false,
  bool sync = false,
}) {
  final source = stream();
  StreamSubscription<T>? sub;
  late final StreamController<T> ctrl;

  final bool shouldPause = waitForListeners && stopWhenNoListeners;

  void startIfNeeded() {
    if (sub != null) return;
    sub = source.listen(
      (v) => ctrl.add(v),
      onError: ctrl.addError,
      onDone: () async {
        if (closeReceiverOnDone && !recvDisconnected) close();
        await ctrl.close();
      },
      cancelOnError: false,
    );
    if (shouldPause && !ctrl.hasListener) {
      sub!.pause();
    }
  }

  Future<void> pauseIfNeeded() async {
    final s = sub;
    if (s != null && !s.isPaused) s.pause();
  }

  Future<void> resumeIfNeeded() async {
    final s = sub;
    if (s != null && s.isPaused) s.resume();
  }

  ctrl = StreamController<T>.broadcast(
    sync: sync,
    onListen: () {
      startIfNeeded();
      if (shouldPause) resumeIfNeeded();
    },
    onCancel: () async {
      if (shouldPause && !ctrl.hasListener) {
        return pauseIfNeeded();
      }
      return;
    },
  );

  if (!waitForListeners) {
    startIfNeeded();
  }

  return ctrl.stream;
}