connect method

  1. @override
Future connect()
override

Open websocket channel

Implementation

@override
Future connect() async {
  if (this.socket != null) {
    throw Exception('Already connected');
  }

  _sequence = 0;

  final WebSocket socket = WebSocket(url);

  socket.connection.listen((state) {
    if (state is Reconnected) {
      for (final query in queries.values) {
        socket.send(query.message);
      }
    }
  });

  final jsonStream = socket.messages.map((response) {
    if (response == null) {
      throw Exception('Invalid jsonrpc field in decoded object');
    }

    final Map<String, dynamic> json;
    try {
      json = jsonDecode(response);
    } catch (e) {
      throw Exception('The response is not a valid json object');
    }

    if (!json.containsKey('jsonrpc') || json['jsonrpc'] != '2.0') {
      throw Exception('Invalid jsonrpc field in decoded object');
    }

    return json;
  }).asBroadcastStream();

  // Commands
  jsonStream.where((message) => message.containsKey('id')).map((message) {
    final id = message['id'] as int;
    final result = message.containsKey('result') ? message['result'] : null;
    final error = message.containsKey('error') ? message['error'] : null;
    return RpcResponse(id: id, result: result, error: error);
  }).listen((message) {
    queries.remove(message.id)!.completer.complete(message);
  });

  // Subscriptions
  jsonStream
      .where((message) =>
          !message.containsKey('id') &&
          message.containsKey('params') &&
          (message['params'] as Map<String, dynamic>).containsKey('subscription'))
      .map((message) {
    final method = message['method'] as String;
    final params = message['params'] as Map<String, dynamic>;
    final subscription = params['subscription'] as String;
    final result = params.containsKey('result') ? params['result'] : null;

    return SubscriptionMessage(method: method, subscription: subscription, result: result);
  }).listen((message) {
    final StreamController? controller = getSubscriptionController(message.subscription);
    controller?.add(message);
  });

  this.socket = socket;
  await isReady();
}