start method
void
start()
Implementation
void start() {
if (_adapters.isEmpty) {
throw StateError(
'Cannot start a SyncServer that has no adapters attached.');
} else if (_started) {
throw StateError('A SyncServer may only be started once.');
}
_started = true;
for (var adapter in _adapters) {
adapter.start();
}
for (var adapter in _adapters) {
// Handle publishes
adapter.onPublish.listen((rq) {
ClientInfo? client;
String? clientId;
if (rq.clientId?.isNotEmpty == true ||
adapter.isTrustedPublishRequest(rq)) {
clientId =
rq.clientId?.isNotEmpty == true ? rq.clientId : _newClientId();
client = _clients.firstWhereOrNull((c) => c.id == clientId);
}
if (client == null) {
rq.reject('Unrecognized client ID "${clientId ?? '<missing>'}".');
} else if (!client.canPublish) {
rq.reject('You are not allowed to publish events.');
} else {
var listeners = _subscriptions[rq.eventName]
?.where((s) => s.clientId != clientId) ??
[];
if (listeners.isEmpty) {
rq.accept(PublishResponse(0, clientId));
} else {
for (var listener in listeners) {
listener.dispatch(rq.value);
}
rq.accept(PublishResponse(listeners.length, clientId));
}
}
});
// Listen for incoming subscriptions
adapter.onSubscribe.listen((rq) async {
ClientInfo? client;
String? clientId;
if (rq.clientId?.isNotEmpty == true ||
adapter.isTrustedSubscriptionRequest(rq)) {
clientId =
rq.clientId?.isNotEmpty == true ? rq.clientId : _newClientId();
client = _clients.firstWhereOrNull((c) => c.id == clientId);
}
if (client == null) {
rq.reject('Unrecognized client ID "${clientId ?? '<missing>'}".');
} else if (!client.canSubscribe) {
rq.reject('You are not allowed to subscribe to events.');
} else {
var sub = await rq.accept(clientId);
var list = _subscriptions.putIfAbsent(rq.eventName, () => []);
list.add(sub);
}
});
// Unregister subscriptions on unsubscribe
adapter.onUnsubscribe.listen((rq) {
Subscription? toRemove;
late List<Subscription> sourceList;
for (var list in _subscriptions.values) {
toRemove = list.firstWhereOrNull((s) => s.id == rq.subscriptionId);
if (toRemove != null) {
sourceList = list;
break;
}
}
if (toRemove == null) {
rq.reject('The specified subscription does not exist.');
} else if (toRemove.clientId != rq.clientId) {
rq.reject('That is not your subscription to cancel.');
} else {
sourceList.remove(toRemove);
rq.accept();
}
});
}
}