connect method
Open websocket channel
Implementation
@override
Future connect() async {
if (this.socket != null) {
throw Exception('Already connected');
}
_sequence = 0;
final WebSocket socket = WebSocket(url);
socket.connection.listen((state) {
if (state is Reconnected) {
for (final query in queries.values) {
socket.send(query.message);
}
}
});
final jsonStream = socket.messages.map((response) {
if (response == null) {
throw Exception('Invalid jsonrpc field in decoded object');
}
final Map<String, dynamic> json;
try {
json = jsonDecode(response);
} catch (e) {
throw Exception('The response is not a valid json object');
}
if (!json.containsKey('jsonrpc') || json['jsonrpc'] != '2.0') {
throw Exception('Invalid jsonrpc field in decoded object');
}
return json;
}).asBroadcastStream();
// Commands
jsonStream
.where((message) => message.containsKey('id'))
.map((message) {
final id = message['id'] as int;
final result = message.containsKey('result') ? message['result'] : null;
final error = message.containsKey('error') ? message['error'] : null;
return RpcResponse(id: id, result: result, error: error);
})
.listen((message) {
queries.remove(message.id)!.completer.complete(message);
});
// Subscriptions
jsonStream
.where(
(message) =>
!message.containsKey('id') &&
message.containsKey('params') &&
(message['params'] as Map<String, dynamic>).containsKey('subscription'),
)
.map((message) {
final method = message['method'] as String;
final params = message['params'] as Map<String, dynamic>;
final subscription = params['subscription'] as String;
final result = params.containsKey('result') ? params['result'] : null;
return SubscriptionMessage(method: method, subscription: subscription, result: result);
})
.listen((message) {
final StreamController? controller = getSubscriptionController(message.subscription);
controller?.add(message);
});
this.socket = socket;
await isReady();
}