RootPod<T extends Object>.fromStream constructor

RootPod<T extends Object>.fromStream(
  1. Stream<T> stream, {
  2. required T initialValue,
  3. void onError(
    1. Object error,
    2. StackTrace stack
    )?,
})

Creates a RootPod whose value tracks stream. Errors emitted by the stream are passed to onError; if onError is not supplied they are logged via df_log. The subscription is cancelled on dispose.

In-flight events that arrive after dispose (before the async StreamSubscription.cancel propagates) are dropped.

onAfterDispose is wired before stream.listen runs so a synchronous emit on subscribe cannot leak the subscription if a callback disposes the pod in the same microtask.

Implementation

RootPod.fromStream(
  Stream<T> stream, {
  required T initialValue,
  void Function(Object error, StackTrace stack)? onError,
}) : value = initialValue {
  StreamSubscription<T>? subscription;
  onAfterDispose = () {
    final s = subscription;
    if (s == null) return;
    // A throw from cancel — uncommon, usually a misbehaving stream
    // transformer — should surface in the log, not be swallowed.
    s.cancel().catchError((Object e, StackTrace st) {
      Log.err(e, tags: {#df_pod});
    });
  };
  subscription = stream.listen(
    (v) {
      if (isDisposed) return;
      _set(v);
    },
    onError: (Object error, StackTrace stack) {
      if (isDisposed) return;
      if (onError != null) {
        onError(error, stack);
      } else {
        Log.err(error, tags: {#df_pod});
      }
    },
  );
  // If a synchronous emit during `listen` already disposed us,
  // `onAfterDispose` ran against a still-null `subscription`. Cancel
  // the now-assigned subscription explicitly so it doesn't outlive
  // the pod.
  if (isDisposed) {
    subscription.cancel().catchError((Object e, StackTrace st) {
      Log.err(e, tags: {#df_pod});
    });
  }
}