listen<TEvent> method

void listen<TEvent>(
  1. HubEventSocket<TEvent> hubSocket,
  2. FutureOr<void> listener(
    1. TEvent event
    ), {
  3. int? prefetch,
})

Subscribes to a HubEventSocket.

Subscribing twice inside of the same service (even across instances) will result in competing consumers. The service is identified via the config value datahub.serviceName.

Implementation

void listen<TEvent>(
  HubEventSocket<TEvent> hubSocket,
  FutureOr<void> Function(TEvent event) listener, {
  int? prefetch,
}) {
  late StreamSubscription sub;
  sub = hubSocket.getStream(prefetch: prefetch).listen(
        (event) async {
          try {
            await listener(event.data);
            event.ack();
          } on StateError catch (e, stack) {
            _log.error(
              'Could not sent ack.',
              error: e,
              trace: stack,
            );
          } catch (e, stack) {
            event.reject(true);
            _log.e(
              'Could not process event.',
              error: e,
              trace: stack,
            );
          }
        },
        cancelOnError: true,
        onDone: () {
          _log.warn('done?');
        },
        onError: (e, stack) async {
          _log.warn(
            'HubSocket subscription failed, restarting.',
            error: e,
            trace: stack,
          );
          try {
            await sub.cancel().timeout(Duration(seconds: 30));
          } catch (e, stack) {
            _log.error(
              'Could not cancel subscription.',
              error: e,
              trace: stack,
            );
          }
          _subscriptions.remove(sub);
          await Future.delayed(const Duration(seconds: 3));
          listen(hubSocket, listener, prefetch: prefetch);
        },
      );
  _subscriptions.add(sub);
}