subscribe<T> method

Stream<HubEvent<T>> subscribe<T>(
  1. String topic, {
  2. TransferBean<T>? bean,
  3. int? prefetch,
})

Subscribes to a topic of an EventHub.

Consider using HubConsumerService.listen or HubEventSocket.stream instead.

Subscribing twice inside of the same service (even across instances) will result in competing consumers. The service is identified via the config value datahub.serviceName.

Implementation

Stream<HubEvent<T>> subscribe<T>(String topic,
    {TransferBean<T>? bean, int? prefetch}) {
  final controller = StreamController<HubEvent<T>>();
  controller.onListen = () async {
    try {
      final channel =
          await _brokerService.openChannel().then((c) => c.qos(0, prefetch));
      _channels.add(channel);

      final queueName =
          '$exchange.${resolve<ConfigService>().serviceName}.$topic';
      final ex = await channel.exchange(exchange, ExchangeType.TOPIC);
      final q = await ex.bindQueueConsumer(queueName, [topic], noAck: false);
      q.listen(
        (message) {
          controller.add(HubEvent(
            bean?.toObject(jsonDecode(message.payloadAsString)) ??
                decodeTyped<T>(message.payloadAsJson),
            message.ack,
            message.reject,
          ));
        },
        onError: controller.addError,
        onDone: controller.close,
      );
      controller.onCancel = q.cancel;
    } catch (e, stack) {
      controller.addError(e, stack);
    }
  };
  return controller.stream;
}