notify method

  1. @protected
dynamic notify(
  1. FExecutionContext executionContext,
  2. dynamic event
)

Implementation

@protected
/*void | Promise<void>*/ dynamic notify(FExecutionContext executionContext,
    /*TEvent | FException*/ dynamic event) {
  if (this.__callbacks == null || this.__callbacks!.isEmpty) {
    return;
  }
  final List<FSubscriberChannelCallback<TData, TEvent>> callbacks =
      this.__callbacks!.toList(growable: false);
  if (event is FException) {
    this.__broken = true;
    this.__callbacks!.clear();
  }
  if (callbacks.length == 1) {
    final FSubscriberChannelCallback<TData, TEvent> callback = callbacks[0];
    return callback(executionContext, event);
  }
  final List<Future<void>> promises = [];
  final List<FException> errors = [];
  for (final callback in callbacks) {
    try {
      final dynamic result = callback(executionContext, event);
      if (result is Future) {
        promises.add(result);
      }
    } catch (e) {
      final FException ex = FException.wrapIfNeeded(e);
      errors.add(ex);
    }
  }

  if (promises.length == 1 && errors.isEmpty) {
    return promises[0];
  } else if (promises.isNotEmpty) {
    return Future.wait(promises.map((Future<void> p) {
      return p.catchError((Object e) {
        final FException ex = FException.wrapIfNeeded(e);
        errors.add(ex);
      });
    })).then((_) {
      if (errors.isNotEmpty) {
        for (final FException error in errors) {
          if (error is! FCancellationException) {
            throw FExceptionAggregate(errors);
          }
        }
        // So, all errors are FCancellationException instances, throw first
        throw errors[0];
      }
    });
  } else {
    if (errors.isNotEmpty) {
      for (final FException error in errors) {
        if (error is! FCancellationException) {
          throw FExceptionAggregate(errors);
        }
      }
      // So, all errors are FCancellationException instances, throw first
      throw errors[0];
    }
  }
}