subscribePrivate<T> method
Subscribes to a topic of an EventHub.
Consider using HubConsumerService.listen or HubEventSocket.stream
instead.
Subscribing twice inside of the same service and across instances will result in both consumers receiving all events emitted during their lifetime.
Implementation
Stream<HubEvent<T>> subscribePrivate<T>(String topic,
{TransferBean<T>? bean, int? prefetch}) {
//TODO reduce duplicate code with subscribe
final controller = StreamController<HubEvent<T>>();
controller.onListen = () async {
try {
final channel = await _brokerService.openChannel(prefetch: prefetch);
_channels.add(channel);
final ex =
await channel.declareExchange(exchange, BrokerExchangeType.topic);
final q = await ex.declareAndBindPrivateQueue([topic]);
_log.debug('Started private queue "${q.name}".');
controller.onCancel = () async {
await q.delete();
_log.debug('Deleted private queue "${q.name}".');
await channel.close();
_channels.remove(channel);
};
await controller.addStream(q.getConsumer(noAck: false).map((message) {
return HubEvent(
bean?.toObject(jsonDecode(utf8.decode(message.payload))) ??
decodeTyped<T>(jsonDecode(utf8.decode(message.payload))),
message.ack,
message.reject,
);
}));
} catch (e, stack) {
controller.addError(e, stack);
}
};
return controller.stream;
}