request method
A function called when a request reaches this Link
Implementation
@override
Stream<Response> request(Request request, [forward]) async* {
final String id = uuid.v4();
final requestWithContext = request.withContextEntry<RequestId>(
RequestId(id),
);
_requests.add(requestWithContext);
if (_connectionStateController.value == ConnectionState.closed) {
await _connect();
}
final StreamController<Response> response = StreamController();
StreamSubscription<GraphQLSocketMessage>? messagesSubscription;
response.onListen = () {
final Stream<ConnectionState> waitForConnectedState =
_connectionStateController
.where((state) => state == ConnectionState.open)
.take(1);
waitForConnectedState.listen((_) {
// listen for response messages
messagesSubscription = _messagesController.stream
.where((message) =>
(message is SubscriptionData && message.id == id) ||
(message is SubscriptionError && message.id == id) ||
(message is SubscriptionComplete && message.id == id))
.takeWhile((_) => !response.isClosed)
.listen(
(message) {
if (message is SubscriptionData || message is SubscriptionError) {
try {
final parsed = _parseMessage(message);
if (parsed.data == null && parsed.errors == null) {
throw WebSocketLinkServerException(
originalException: null,
parsedResponse: parsed,
requestMessage: null,
);
}
response.add(parsed);
} catch (e) {
response.addError(e);
}
} else if (message is SubscriptionComplete) {
response.close();
}
},
onError: response.addError,
onDone: response.close,
);
// Send the request.
_write(
StartOperation(
id,
serializer.serializeRequest(requestWithContext),
),
).catchError(response.addError);
});
};
response.onCancel = () {
if (isDisabled) {
return;
}
messagesSubscription?.cancel();
_write(StopOperation(id)).catchError(response.addError);
_requests.removeWhere((e) => e.context.entry<RequestId>()!.id == id);
};
yield* response.stream;
}