subscribePrivate<T> method

Stream<HubEvent<T>> subscribePrivate<T>(
  1. String topic, {
  2. TransferBean<T>? bean,
  3. int? prefetch,
})
inherited

Subscribes to a topic of an EventHub.

Consider using HubConsumerService.listen or HubEventSocket.stream instead.

Subscribing twice inside of the same service and across instances will result in both consumers receiving all events emitted during their lifetime.

Implementation

Stream<HubEvent<T>> subscribePrivate<T>(String topic,
    {TransferBean<T>? bean, int? prefetch}) {
  //TODO reduce duplicate code with subscribe
  final controller = StreamController<HubEvent<T>>();
  controller.onListen = () async {
    try {
      final channel = await _brokerService.openChannel(prefetch: prefetch);
      _channels.add(channel);

      final ex =
          await channel.declareExchange(exchange, BrokerExchangeType.topic);
      final q = await ex.declareAndBindPrivateQueue([topic]);
      _log.debug('Started private queue "${q.name}".');

      controller.onCancel = () async {
        await q.delete();
        _log.debug('Deleted private queue "${q.name}".');
        await channel.close();
        _channels.remove(channel);
      };

      await controller.addStream(q.getConsumer(noAck: false).map((message) {
        return HubEvent(
          bean?.toObject(jsonDecode(utf8.decode(message.payload))) ??
              decodeTyped<T>(jsonDecode(utf8.decode(message.payload))),
          message.ack,
          message.reject,
        );
      }));
    } catch (e, stack) {
      controller.addError(e, stack);
    }
  };
  return controller.stream;
}