stopStream method
Cancels the current input subscription, closes the broadcast controller, and resolves initialData with an Err (so awaiters don't hang forever). Safe to call repeatedly: subsequent calls become no-ops.
Implementation
@protected
Resolvable<Unit> stopStream() {
final seq = TaskSequencer();
final prevSubscription = _streamSubscription;
_streamSubscription = const None();
if (prevSubscription case Some(value: final sub)) {
seq.then((prev) {
assert(!prev.isErr(), 'stopStream: unexpected Err in seq chain: $prev');
return Async(() async {
final _ = await sub.cancel();
if (prev case Err(:final error)) throw error;
return const None();
});
}).end();
}
final prevController = _streamController;
_streamController = const None();
if (prevController case Some(value: final ctrl) when !ctrl.isClosed) {
seq.then((prev) {
assert(!prev.isErr(), 'stopStream: unexpected Err in seq chain: $prev');
return Async(() async {
await ctrl.close();
if (prev case Err(:final error)) throw error;
return const None();
});
}).end();
}
// Complete the initialData completer with an error before clearing it.
// This ensures any code awaiting initialData won't hang forever.
// We also pre-attach a no-op error handler so that if no caller is
// currently awaiting initialData, Dart's uncaught-future-error reporter
// doesn't surface the synthetic stop error to the surrounding zone.
final prevCompleter = _initDataCompleter;
_initDataCompleter = const None();
if (prevCompleter case Some(value: final c) when !c.isCompleted) {
if (c.resolvable().value case final Future<Object?> fut) {
fut.then<void>(
(_) {},
onError: (_, [__]) {},
);
}
c
.resolve(
Sync.err(Err('Stream stopped before initial data was received.')),
)
.end();
}
return seq.completion.toUnit();
}