subscribe method
Sends a query, mutation or subscription request to the server, and returns a stream of the response.
If the request is a query or mutation, a timeout will be applied to the request as specified by
SocketClientConfig's queryAndMutationTimeout
field.
If the request is a subscription, obviously no timeout is applied.
In case of socket disconnection, the returned stream will be closed.
Implementation
Stream<Response> subscribe(
final Request payload,
final bool waitForConnection,
) {
final String id = _uuid.v4(
options: {
'random': randomBytesForUuid,
},
).toString();
final StreamController<Response> response = StreamController<Response>();
StreamSubscription<SocketConnectionState>? sub;
final bool addTimeout =
!payload.isSubscription && config.queryAndMutationTimeout != null;
final onListen = () {
final Stream<SocketConnectionState> waitForConnectedStateWithoutTimeout =
(waitForConnection
? _connectionStateController
: _connectionStateController
.startWith(SocketConnectionState.connected))
.where((SocketConnectionState state) =>
state == SocketConnectionState.connected)
.take(1);
final Stream<SocketConnectionState> waitForConnectedState = addTimeout
? waitForConnectedStateWithoutTimeout.timeout(
config.queryAndMutationTimeout!,
onTimeout: (EventSink<SocketConnectionState> event) {
print('Connection timed out.');
response.addError(TimeoutException('Connection timed out.'));
event.close();
response.close();
},
)
: waitForConnectedStateWithoutTimeout;
sub = waitForConnectedState.listen((_) {
final Stream<GraphQLSocketMessage> dataErrorComplete = _messages.where(
(GraphQLSocketMessage message) {
if (message is SubscriptionData) {
return message.id == id;
}
if (message is SubscriptionNext) {
return message.id == id;
}
if (message is SubscriptionError) {
return message.id == id;
}
if (message is SubscriptionComplete) {
return message.id == id;
}
return false;
},
).takeWhile((_) => (!response.isClosed && !_wasDisposed));
final Stream<GraphQLSocketMessage> subscriptionComplete = addTimeout
? dataErrorComplete
.where((message) => message is SubscriptionComplete)
.take(1)
.timeout(
config.queryAndMutationTimeout!,
onTimeout: (EventSink<GraphQLSocketMessage> event) {
response.addError(TimeoutException('Request timed out.'));
event.close();
response.close();
},
)
: dataErrorComplete
.where((message) => message is SubscriptionComplete)
.take(1);
subscriptionComplete.listen(
(_) => response.close(),
onDone: () {
if (!config.autoReconnect) {
response.close();
}
},
onError: (_) {
if (!config.autoReconnect) {
response.close();
}
},
);
dataErrorComplete
.where((message) => message is SubscriptionData)
.cast<SubscriptionData>()
.listen((message) => response.add(
parse(message.toJson()),
));
dataErrorComplete
.where((message) => message is SubscriptionNext)
.whereType<SubscriptionNext>()
.listen((message) => response.add(
parse(message.toJson()),
));
dataErrorComplete
.where((message) => message is SubscriptionError)
.cast<SubscriptionError>()
.listen((message) => response.addError(message));
if (!_subscriptionInitializers[id]!.hasBeenTriggered) {
GraphQLSocketMessage operation = StartOperation(
id,
serialize(payload),
);
if (protocol == GraphQLProtocol.graphqlTransportWs) {
operation = SubscribeOperation(
id,
serialize(payload),
);
}
_write(operation);
_subscriptionInitializers[id]!.hasBeenTriggered = true;
}
});
};
response.onListen = onListen;
response.onCancel = () {
_subscriptionInitializers.remove(id);
sub?.cancel();
if (protocol == GraphQLProtocol.graphqlWs &&
_connectionStateController.value == SocketConnectionState.connected &&
socketChannel != null) {
_write(StopOperation(id));
}
};
_subscriptionInitializers[id] = SubscriptionListener(onListen, false);
return response.stream;
}