listen<T> method

void listen<T>(
  1. String topic, {
  2. required OnTopicCallBack<T> onTopic,
})

Add a subscription to the topic

Implementation

void listen<T>(
  String topic, {
  required OnTopicCallBack<T> onTopic,
}) {
  _controllers.putIfAbsent(
    topic,
    () => StreamController.broadcast(),
  );
  _subscribers.putIfAbsent(
    topic,
    () => [],
  );

  final subscription = _controllers[topic]!.stream.listen(
    (event) {
      onTopic(event.data);
    },
  );

  _subscribers[topic]!.add(
    FkafkaSubscriber<T>(
      isActive: true,
      onTopic: onTopic,
      subscription: subscription,
    ),
  );
}