streamAddAwaited<T> static method

Future<T> streamAddAwaited<T>(
  1. StreamController<T> streamController,
  2. T value, {
  3. bool predicate(
    1. T event
    )?,
  4. Duration? timeout,
})

Implementation

static Future<T> streamAddAwaited<T>(
  StreamController<T> streamController,
  T value, {
  bool Function(T event)? predicate,
  Duration? timeout,
}) {
  final completer = Completer<T>();
  late StreamSubscription<T> sub;

  sub = streamController.stream.listen(
    (event) {
      if (predicate != null) {
        if (predicate(event)) {
          completer.complete(event);
          sub.cancel();
        }
      } else {
        completer.complete(event);
        sub.cancel();
      }
    },
    onError: (e, s) {
      if (!completer.isCompleted) {
        completer.completeError(e, s);
      }
      sub.cancel();
    },
  );

  streamController.add(value);

  if (timeout != null) {
    return completer.future.timeout(timeout, onTimeout: () {
      sub.cancel();
      throw TimeoutException('Timeout while waiting for stream event');
    });
  }

  return completer.future;
}