listen<TEvent> method
void
listen<TEvent>(
- HubEventSocket<
TEvent> hubSocket, - FutureOr<
void> listener(- TEvent event
- int? prefetch,
Subscribes to a HubEventSocket.
Subscribing twice inside of the same service (even across instances)
will result in competing consumers. The service is identified via
the config value datahub.serviceName
.
Implementation
void listen<TEvent>(
HubEventSocket<TEvent> hubSocket,
FutureOr<void> Function(TEvent event) listener, {
int? prefetch,
}) {
late StreamSubscription sub;
sub = hubSocket.getStream(prefetch: prefetch).listen(
(event) async {
try {
await listener(event.data);
event.ack();
} on StateError catch (e, stack) {
_log.error(
'Could not sent ack.',
error: e,
trace: stack,
);
} catch (e, stack) {
event.reject(true);
_log.e(
'Could not process event.',
error: e,
trace: stack,
);
}
},
cancelOnError: true,
onDone: () {
_log.warn('done?');
},
onError: (e, stack) async {
_log.warn(
'HubSocket subscription failed, restarting.',
error: e,
trace: stack,
);
try {
await sub.cancel().timeout(Duration(seconds: 30));
} catch (e, stack) {
_log.error(
'Could not cancel subscription.',
error: e,
trace: stack,
);
}
_subscriptions.remove(sub);
await Future.delayed(const Duration(seconds: 3));
listen(hubSocket, listener, prefetch: prefetch);
},
);
_subscriptions.add(sub);
}