subscribe method

  1. @override
Subscription subscribe(
  1. dynamic eventType, {
  2. List<SubscriptionOpt>? opts,
})
override

Subscribe creates a new Subscription.

eventType can be either a single event type, or a list of types to subscribe to multiple event types at once, under a single subscription (and stream).

If you want to subscribe to ALL events emitted in the bus, use WildcardSubscription as the eventType:

eventbus.subscribe(WildcardSubscription)

Simple example:

var sub = await eventbus.subscribe(EventType); await sub.stream.forEach((event) { // The event is guaranteed to be of type EventType // Handle the event }); await sub.close();

Multi-type example:

var sub = await eventbus.subscribe(EventA, EventB); await sub.stream.forEach((event) { if (event is EventA) { // Handle EventA } else if (event is EventB) { // Handle EventB } }); await sub.close();

Implementation

@override
Subscription subscribe(dynamic eventType, {List<SubscriptionOpt>? opts}) {
  final settings = SubSettings();
  if (opts != null) {
    for (final opt in opts) {
      opt(settings);
    }
  }

  // Handle wildcard subscription
  if (identical(eventType, WildcardSubscription)) {
    final controller = StreamController<Object>.broadcast(sync: false);
    final sub = _WildcardSubscription(
      controller: controller,
      node: _wildcard,
      name: settings.name,
      metricsTracer: _metricsTracer,
    );
    _wildcard.addSink(_NamedSink(controller: controller, name: sub.name));
    return sub;
  }

  // Handle regular subscriptions
  List<String> types;
  if (eventType is List) {
    types = eventType.map((e) => e.toString()).toList();

    // Check for wildcard in multi-type subscription
    if (types.any((t) => identical(t, WildcardSubscription.toString()))) {
      throw Exception('Wildcard subscriptions must be started separately');
    }
  } else {
    types = [eventType.toString()];
  }

  final controller = StreamController<Object>.broadcast(sync: false);
  final List<_Node> nodeListForSubscription = [];
  final List<Future<void>> pendingInitializations = [];

  final sub = _Subscription(
    controller: controller,
    nodes: nodeListForSubscription,
    pendingOps: pendingInitializations,
    dropper: _tryDropNode,
    name: settings.name,
    metricsTracer: _metricsTracer,
  );

  for (final eventTypeString in types) {
    final future = _withNode(eventTypeString, (node) async {
      await node.addSink(_NamedSink(controller: controller, name: sub.name));
      nodeListForSubscription.add(node); // Add to the list passed to _Subscription
      node.keepLast = true; // Always keep the last event when there are subscribers
      _metricsTracer?.addSubscriber(eventTypeString);
      // Deliver the last event directly if available
      if (node.last != null) {
        Future.microtask(() {
          if (!controller.isClosed) {
            controller.add(node.last!);
          }
        });
      }
    }, null);
    pendingInitializations.add(future);
  }

  return sub;
}