connect method

  1. @override
Future connect()
override

Open websocket channel

Implementation

@override
Future connect() async {
  if (this.channel != null) {
    throw Exception('Already connected');
  }

  _sequence = 0;
  final WebSocketChannel channel = WebSocketChannel.connect(url);
  final jsonStream = channel.stream.map((response) {
    if (response == null) {
      throw Exception('Invalid jsonrpc field in decoded object');
    }

    final Map<String, dynamic> json;
    try {
      json = jsonDecode(response);
    } catch (e) {
      throw Exception('The response is not a valid json object');
    }

    if (!json.containsKey('jsonrpc') || json['jsonrpc'] != '2.0') {
      throw Exception('Invalid jsonrpc field in decoded object');
    }

    return json;
  }).asBroadcastStream();

  // Commands
  jsonStream.where((message) => message.containsKey('id')).map((message) {
    final id = message['id'] as int;
    final result = message.containsKey('result') ? message['result'] : null;
    final error = message.containsKey('error') ? message['error'] : null;
    return RpcResponse(id: id, result: result, error: error);
  }).listen((message) {
    queries.remove(message.id)!.complete(message);
  });

  // Subscriptions
  jsonStream
      .where((message) =>
          !message.containsKey('id') &&
          message.containsKey('params') &&
          (message['params'] as Map<String, dynamic>)
              .containsKey('subscription'))
      .map((message) {
    final method = message['method'] as String;
    final params = message['params'] as Map<String, dynamic>;
    final subscription = params['subscription'] as String;
    final result = params.containsKey('result') ? params['result'] : null;

    return SubscriptionMessage(
        method: method, subscription: subscription, result: result);
  }).listen((message) {
    final StreamController? controller =
        getSubscriptionController(message.subscription);
    controller?.add(message);
  });

  this.channel = channel;
  await channel.ready;
}