start method

void start()

Implementation

void start() {
  if (_adapters.isEmpty) {
    throw StateError(
        'Cannot start a SyncServer that has no adapters attached.');
  } else if (_started) {
    throw StateError('A SyncServer may only be started once.');
  }

  _started = true;

  for (var adapter in _adapters) {
    adapter.start();
  }

  for (var adapter in _adapters) {
    // Handle publishes
    adapter.onPublish.listen((rq) {
      ClientInfo? client;
      String? clientId;

      if (rq.clientId?.isNotEmpty == true ||
          adapter.isTrustedPublishRequest(rq)) {
        clientId =
            rq.clientId?.isNotEmpty == true ? rq.clientId : _newClientId();
        client = _clients.firstWhereOrNull((c) => c.id == clientId);
      }

      if (client == null) {
        rq.reject('Unrecognized client ID "${clientId ?? '<missing>'}".');
      } else if (!client.canPublish) {
        rq.reject('You are not allowed to publish events.');
      } else {
        var listeners = _subscriptions[rq.eventName]
                ?.where((s) => s.clientId != clientId) ??
            [];

        if (listeners.isEmpty) {
          rq.accept(PublishResponse(0, clientId));
        } else {
          for (var listener in listeners) {
            listener.dispatch(rq.value);
          }

          rq.accept(PublishResponse(listeners.length, clientId));
        }
      }
    });

    // Listen for incoming subscriptions
    adapter.onSubscribe.listen((rq) async {
      ClientInfo? client;
      String? clientId;

      if (rq.clientId?.isNotEmpty == true ||
          adapter.isTrustedSubscriptionRequest(rq)) {
        clientId =
            rq.clientId?.isNotEmpty == true ? rq.clientId : _newClientId();
        client = _clients.firstWhereOrNull((c) => c.id == clientId);
      }

      if (client == null) {
        rq.reject('Unrecognized client ID "${clientId ?? '<missing>'}".');
      } else if (!client.canSubscribe) {
        rq.reject('You are not allowed to subscribe to events.');
      } else {
        var sub = await rq.accept(clientId);
        var list = _subscriptions.putIfAbsent(rq.eventName, () => []);
        list.add(sub);
      }
    });

    // Unregister subscriptions on unsubscribe
    adapter.onUnsubscribe.listen((rq) {
      Subscription? toRemove;
      late List<Subscription> sourceList;

      for (var list in _subscriptions.values) {
        toRemove = list.firstWhereOrNull((s) => s.id == rq.subscriptionId);
        if (toRemove != null) {
          sourceList = list;
          break;
        }
      }

      if (toRemove == null) {
        rq.reject('The specified subscription does not exist.');
      } else if (toRemove.clientId != rq.clientId) {
        rq.reject('That is not your subscription to cancel.');
      } else {
        sourceList.remove(toRemove);
        rq.accept();
      }
    });
  }
}