useObservable<T> function
A hook that consumes a Stream and returns its latest result as an AsyncSnapshot.
The streamFactory is called when the widget is mounted or whenever keys change.
The stream subscription is automatically cancelled when the widget is unmounted
or when the keys change.
Returns an AsyncSnapshot that represents the current state:
- AsyncSnapshot.waiting initially (pending state)
- AsyncSnapshot.withData when a value is emitted
- AsyncSnapshot.withError when an error occurs
Example:
class MyWidget extends HookWidget {
final String userId;
MyWidget({required this.userId});
@override
Widget build(BuildContext context) {
final snapshot = useObservable(
() => userService.watchUser(userId),
[userId],
);
return snapshot.mapOrElse(
ok: (user) => Text(user.name),
err: (error) => Text('Error: $error'),
pending: () => CircularProgressIndicator(),
);
}
}
Implementation
AsyncSnapshot<T> useObservable<T>(
Stream<T> Function() streamFactory,
List<Object?> keys,
) {
final snapshot = useState(AsyncSnapshot<T>.waiting());
final context = useContext();
useEffect(() {
// Reset to pending state when keys change
snapshot.value = AsyncSnapshot<T>.waiting();
StreamSubscription<T>? subscription;
try {
subscription = streamFactory().listen(
(value) {
if (context.mounted) {
snapshot.value = AsyncSnapshot.withData(
ConnectionState.active,
value,
);
}
},
onError: (Object error, StackTrace stackTrace) {
if (context.mounted) {
snapshot.value = AsyncSnapshot.withError(
ConnectionState.active,
error,
stackTrace,
);
}
},
onDone: () {
if (context.mounted) {
// On completion, keep the last value but mark as done
snapshot.value = snapshot.value.inState(ConnectionState.done);
}
},
);
} catch (e, st) {
snapshot.value = AsyncSnapshot.withError(ConnectionState.done, e, st);
}
return () {
subscription?.cancel();
};
}, keys);
return snapshot.value;
}