fromCancellableFuture<T> function
Converts a Future factory (with cancellation token) to a Stream.
This allows you to create a Stream from an async operation that can be cancelled when the Stream subscription is cancelled. The cancellation token is passed to the block, allowing the async code to check for cancellation and stop early if needed.
When the Stream is cancelled (subscription disposed), the token's cancel()
method is called, setting token.isCancelled to true.
Example:
final stream = fromCancellableFuture((token) async {
await Future.delayed(Duration(seconds: 1));
if (token.isCancelled) return null; // Check and bail early
return await fetchData();
});
final subscription = stream.listen((data) => print(data));
// Later, cancel the operation
subscription.cancel();
Implementation
Stream<T> fromCancellableFuture<T>(
Future<T> Function(CancellationToken token) block,
) {
return Stream<T>.multi((controller) {
final token = CancellationToken();
controller.onCancel = () {
token.cancel();
};
try {
final future = block(token);
future.then(
(value) {
if (!token.isCancelled) {
controller.add(value);
controller.close();
}
},
onError: (Object error, StackTrace stackTrace) {
if (!token.isCancelled) {
controller.addError(error, stackTrace);
}
},
);
} catch (e, st) {
if (!token.isCancelled) {
controller.addError(e, st);
}
}
});
}