GraphQLWebSocketServer constructor
GraphQLWebSocketServer(
- RemoteClient client, {
- Duration? keepAliveInterval,
- Duration? connectionInitWaitTimeout,
Creates the server logic for receiving and responding messages
through the client
. The client.protocol
should be in
the supportedProtocols and that will determine the logic executed.
Implementation
GraphQLWebSocketServer(
this.client, {
this.keepAliveInterval,
this.connectionInitWaitTimeout,
}) {
done.onError((Object e, StackTrace s) async {
if (isTransportWsProtocol) {
await client.closeWithReason(
e is FormatException
? ErrorReason.invalidMessage
: ErrorReason.internalError,
e.toString(),
);
}
return _onDone();
});
_connectionInitTimer = connectionInitWaitTimeout == null
? null
: Timer(connectionInitWaitTimeout!, _connectionInitTimeout);
_clientMessageSubscription = client.stream.listen(
(msg) async {
if (msg.type == OperationMessage.gqlConnectionInit) {
_connectionInitTimer?.cancel();
if (_receivedInit && isTransportWsProtocol) {
return client.closeWithReason(
ErrorReason.tooManyInitialisations,
'Too many initialisation requests',
);
}
_receivedInit = true;
try {
Map<String, Object?>? connectionParams;
if (msg.payload is Map) {
connectionParams = (msg.payload as Map?)?.cast();
} else if (msg.payload != null) {
throw FormatException(
'${msg.type} payload must be a map (object).');
}
final connect = await onConnect(client, connectionParams);
if (!connect) throw false;
_init = true;
client.sink.add(
const OperationMessage(OperationMessage.gqlConnectionAck),
);
if (keepAliveInterval != null) {
final _keepAliveMsg = OperationMessage(
isTransportWsProtocol
? OperationMessage.pong
: OperationMessage.gqlConnectionKeepAlive,
);
client.sink.add(_keepAliveMsg);
_timer ??= Timer.periodic(keepAliveInterval!, (timer) {
client.sink.add(_keepAliveMsg);
});
}
} catch (e) {
final _unauthorized = e == false;
final String message =
_unauthorized ? 'The connection was rejected.' : e.toString();
if (isTransportWsProtocol) {
return client.closeWithReason(
_unauthorized
? ErrorReason.unauthorized
: e is FormatException
? ErrorReason.invalidMessage
: ErrorReason.internalError,
message,
);
} else {
client.sink.add(
OperationMessage(
OperationMessage.gqlConnectionError,
payload: {'message': message},
),
);
}
}
} else if (msg.type == OperationMessage.ping) {
client.sink.add(const OperationMessage(OperationMessage.pong));
} else if (_init) {
if (msg.type == OperationMessage.subscribe ||
msg.type == OperationMessage.gqlStart) {
if (msg.id == null) {
throw FormatException('${msg.type} id is required.');
}
if (isTransportWsProtocol &&
_currentOperationIds.containsKey(msg.id)) {
return client.closeWithReason(
ErrorReason.duplicateSubscriptionId,
'Subscriber for ${msg.id} already exists',
);
}
_currentOperationIds[msg.id!] = null;
if (msg.payload == null) {
throw FormatException('${msg.type} payload is required.');
} else if (msg.payload is! Map) {
throw FormatException(
'${msg.type} payload must be a map (object).');
}
final payload = msg.payload as Map;
final Object? query = payload['query'];
final Object? variables = payload['variables'];
final Object? operationName = payload['operationName'];
final Object? extensions = payload['extensions'];
if (query is! String) {
throw FormatException(
'${msg.type} payload must contain a string named "query".');
}
if (variables is! Map?) {
throw FormatException('${msg.type} payload\'s "variables" field'
' must be a map (object).');
}
if (operationName is! String?) {
throw FormatException(
'${msg.type} payload\'s "operationName" field'
' must be a string.');
}
if (extensions is! Map?) {
throw FormatException('${msg.type} payload\'s "extensions" field'
' must be a map (object).');
}
final result = await onOperation(
msg.id,
query,
variables?.cast<String, dynamic>(),
operationName,
extensions?.cast<String, dynamic>(),
);
if (!result.didExecute) {
client.sink.add(OperationMessage(
OperationMessage.gqlError,
id: msg.id,
payload: result.errors,
));
if (isTransportWsProtocol) {
_currentOperationIds.remove(msg.id);
return;
}
} else {
await _sendData(msg.id!, result);
}
// Don't send complete if the client completed the subscription
if (!isTransportWsProtocol ||
_currentOperationIds.containsKey(msg.id)) {
client.sink.add(
OperationMessage(OperationMessage.gqlComplete, id: msg.id));
}
_currentOperationIds.remove(msg.id);
} else if (msg.type == OperationMessage.gqlComplete ||
msg.type == OperationMessage.gqlStop) {
if (msg.id == null) {
throw FormatException('${msg.type} id is required.');
}
final subs = _currentOperationIds.remove(msg.id);
if (subs != null) {
await subs.cancel();
}
} else if (msg.type == OperationMessage.gqlConnectionTerminate) {
await _clientMessageSubscription.cancel();
await _onDone();
}
} else if (msg.type == OperationMessage.subscribe) {
return client.closeWithReason(
ErrorReason.unauthorized,
'Unauthorized',
);
}
},
onError: _done.completeError,
onDone: _onDone,
cancelOnError: true,
);
}