subscribe<T> method
Subscribes to a topic of an EventHub.
Consider using HubConsumerService.listen or HubEventSocket.stream
instead.
Subscribing twice inside of the same service (even across instances)
will result in competing consumers. The service is identified via
the config value datahub.serviceName
.
Implementation
Stream<HubEvent<T>> subscribe<T>(String topic,
{TransferBean<T>? bean, int? prefetch}) {
final controller = StreamController<HubEvent<T>>();
controller.onListen = () async {
try {
final channel =
await _brokerService.openChannel().then((c) => c.qos(0, prefetch));
_channels.add(channel);
final queueName =
'$exchange.${resolve<ConfigService>().serviceName}.$topic';
final ex = await channel.exchange(exchange, ExchangeType.TOPIC);
final q = await ex.bindQueueConsumer(queueName, [topic], noAck: false);
q.listen(
(message) {
controller.add(HubEvent(
bean?.toObject(jsonDecode(message.payloadAsString)) ??
decodeTyped<T>(message.payloadAsJson),
message.ack,
message.reject,
));
},
onError: controller.addError,
onDone: controller.close,
);
controller.onCancel = q.cancel;
} catch (e, stack) {
controller.addError(e, stack);
}
};
return controller.stream;
}