connect method

  1. @override
Future connect()
override

Open websocket channel

Implementation

@override
Future connect() async {
  if (this.socket != null) {
    throw Exception('Already connected');
  }

  _sequence = 0;

  final WebSocket socket = WebSocket(url);

  socket.connection.listen((state) {
    if (state is Reconnected) {
      for (final query in queries.values) {
        socket.send(query.message);
      }
    }
  });

  final jsonStream = socket.messages.map((response) {
    if (response == null) {
      throw Exception('Invalid jsonrpc field in decoded object');
    }

    final Map<String, dynamic> json;
    try {
      json = jsonDecode(response);
    } catch (e) {
      throw Exception('The response is not a valid json object');
    }

    if (!json.containsKey('jsonrpc') || json['jsonrpc'] != '2.0') {
      throw Exception('Invalid jsonrpc field in decoded object');
    }

    return json;
  }).asBroadcastStream();

  // Commands
  jsonStream
      .where((message) => message.containsKey('id'))
      .map((message) {
        final id = message['id'] as int;
        final result = message.containsKey('result') ? message['result'] : null;
        final error = message.containsKey('error') ? message['error'] : null;
        return RpcResponse(id: id, result: result, error: error);
      })
      .listen((message) {
        queries.remove(message.id)!.completer.complete(message);
      });

  // Subscriptions
  jsonStream
      .where(
        (message) =>
            !message.containsKey('id') &&
            message.containsKey('params') &&
            (message['params'] as Map<String, dynamic>).containsKey('subscription'),
      )
      .map((message) {
        final method = message['method'] as String;
        final params = message['params'] as Map<String, dynamic>;
        final subscription = params['subscription'] as String;
        final result = params.containsKey('result') ? params['result'] : null;

        return SubscriptionMessage(method: method, subscription: subscription, result: result);
      })
      .listen((message) {
        final StreamController? controller = getSubscriptionController(message.subscription);
        controller?.add(message);
      });

  this.socket = socket;
  await isReady();
}