stopStream method

  1. @protected
Resolvable<Unit> stopStream()
inherited

Cancels the current input subscription, closes the broadcast controller, and resolves initialData with an Err (so awaiters don't hang forever). Safe to call repeatedly: subsequent calls become no-ops.

Implementation

@protected
Resolvable<Unit> stopStream() {
  final seq = TaskSequencer();
  final prevSubscription = _streamSubscription;
  _streamSubscription = const None();
  if (prevSubscription case Some(value: final sub)) {
    seq.then((prev) {
      assert(!prev.isErr(), 'stopStream: unexpected Err in seq chain: $prev');
      return Async(() async {
        final _ = await sub.cancel();
        if (prev case Err(:final error)) throw error;
        return const None();
      });
    }).end();
  }
  final prevController = _streamController;
  _streamController = const None();
  if (prevController case Some(value: final ctrl) when !ctrl.isClosed) {
    seq.then((prev) {
      assert(!prev.isErr(), 'stopStream: unexpected Err in seq chain: $prev');
      return Async(() async {
        await ctrl.close();
        if (prev case Err(:final error)) throw error;
        return const None();
      });
    }).end();
  }
  // Complete the initialData completer with an error before clearing it.
  // This ensures any code awaiting initialData won't hang forever.
  // We also pre-attach a no-op error handler so that if no caller is
  // currently awaiting initialData, Dart's uncaught-future-error reporter
  // doesn't surface the synthetic stop error to the surrounding zone.
  final prevCompleter = _initDataCompleter;
  _initDataCompleter = const None();
  if (prevCompleter case Some(value: final c) when !c.isCompleted) {
    if (c.resolvable().value case final Future<Object?> fut) {
      fut.then<void>(
        (_) {},
        onError: (_, [__]) {},
      );
    }
    c
        .resolve(
          Sync.err(Err('Stream stopped before initial data was received.')),
        )
        .end();
  }
  return seq.completion.toUnit();
}