RootPod<T extends Object>.fromStream constructor
RootPod<T extends Object>.fromStream (
- Stream<
T> stream, { - required T initialValue,
- void onError(
- Object error,
- StackTrace stack
Creates a RootPod whose value tracks stream. Errors emitted by
the stream are passed to onError; if onError is not supplied
they are logged via df_log. The subscription is cancelled on
dispose.
In-flight events that arrive after dispose (before the async StreamSubscription.cancel propagates) are dropped.
onAfterDispose is wired before stream.listen runs so a
synchronous emit on subscribe cannot leak the subscription if a
callback disposes the pod in the same microtask.
Implementation
RootPod.fromStream(
Stream<T> stream, {
required T initialValue,
void Function(Object error, StackTrace stack)? onError,
}) : value = initialValue {
StreamSubscription<T>? subscription;
onAfterDispose = () {
final s = subscription;
if (s == null) return;
// A throw from cancel — uncommon, usually a misbehaving stream
// transformer — should surface in the log, not be swallowed.
s.cancel().catchError((Object e, StackTrace st) {
Log.err(e, tags: {#df_pod});
});
};
subscription = stream.listen(
(v) {
if (isDisposed) return;
_set(v);
},
onError: (Object error, StackTrace stack) {
if (isDisposed) return;
if (onError != null) {
onError(error, stack);
} else {
Log.err(error, tags: {#df_pod});
}
},
);
// If a synchronous emit during `listen` already disposed us,
// `onAfterDispose` ran against a still-null `subscription`. Cancel
// the now-assigned subscription explicitly so it doesn't outlive
// the pod.
if (isDisposed) {
subscription.cancel().catchError((Object e, StackTrace st) {
Log.err(e, tags: {#df_pod});
});
}
}