subscribe method
Subscribe creates a new Subscription.
eventType can be either a single event type, or a list of types to subscribe to multiple event types at once, under a single subscription (and stream).
If you want to subscribe to ALL events emitted in the bus, use
WildcardSubscription
as the eventType
:
eventbus.subscribe(WildcardSubscription)
Simple example:
var sub = await eventbus.subscribe(EventType); await sub.stream.forEach((event) { // The event is guaranteed to be of type EventType // Handle the event }); await sub.close();
Multi-type example:
var sub = await eventbus.subscribe(EventA, EventB
);
await sub.stream.forEach((event) {
if (event is EventA) {
// Handle EventA
} else if (event is EventB) {
// Handle EventB
}
});
await sub.close();
Implementation
@override
Subscription subscribe(dynamic eventType, {List<SubscriptionOpt>? opts}) {
final settings = SubSettings();
if (opts != null) {
for (final opt in opts) {
opt(settings);
}
}
// Handle wildcard subscription
if (identical(eventType, WildcardSubscription)) {
final controller = StreamController<Object>.broadcast(sync: false);
final sub = _WildcardSubscription(
controller: controller,
node: _wildcard,
name: settings.name,
metricsTracer: _metricsTracer,
);
_wildcard.addSink(_NamedSink(controller: controller, name: sub.name));
return sub;
}
// Handle regular subscriptions
List<String> types;
if (eventType is List) {
types = eventType.map((e) => e.toString()).toList();
// Check for wildcard in multi-type subscription
if (types.any((t) => identical(t, WildcardSubscription.toString()))) {
throw Exception('Wildcard subscriptions must be started separately');
}
} else {
types = [eventType.toString()];
}
final controller = StreamController<Object>.broadcast(sync: false);
final List<_Node> nodeListForSubscription = [];
final List<Future<void>> pendingInitializations = [];
final sub = _Subscription(
controller: controller,
nodes: nodeListForSubscription,
pendingOps: pendingInitializations,
dropper: _tryDropNode,
name: settings.name,
metricsTracer: _metricsTracer,
);
for (final eventTypeString in types) {
final future = _withNode(eventTypeString, (node) async {
await node.addSink(_NamedSink(controller: controller, name: sub.name));
nodeListForSubscription.add(node); // Add to the list passed to _Subscription
node.keepLast = true; // Always keep the last event when there are subscribers
_metricsTracer?.addSubscriber(eventTypeString);
// Deliver the last event directly if available
if (node.last != null) {
Future.microtask(() {
if (!controller.isClosed) {
controller.add(node.last!);
}
});
}
}, null);
pendingInitializations.add(future);
}
return sub;
}