connectWs method

Future<void> connectWs(
  1. WsEndpoint wsEndpoint
)

Iterates through available websocket urls and attempts connection. Upon a successful connection it handles the various websocket callbacks. Upon an unsuccessful connection it iterates to the next websocket url in the array.

Implementation

Future<void> connectWs(WsEndpoint wsEndpoint) async {
  for (int i = 0; i < _endpointArray.length; i += 1) {
    final index = deriveEndpointIndex(i);
    final thisProxyWsUrl = _endpointArray[index].wsUrl;
    final websocketUrlConnects = await _websocketUrlConnects(thisProxyWsUrl);

    if (websocketUrlConnects) {
      // Set this index to state
      _workingIndex = index;

      try {
        final channel = WebSocketChannel.connect(Uri.parse(thisProxyWsUrl));

        // Set up message handler
        channel.stream.listen(
          (message) {
            if (wsEndpoint.onMessage != null) {
              wsEndpoint.handleMsg(message as List<int>);
            }
          },
          onError: (error) {
            if (wsEndpoint.onError != null) {
              wsEndpoint.onError!(error);
              wsEndpoint.close();
            }
          },
          onDone: () {
            // End if manually closed or no auto-reconnect
            if (wsEndpoint.manuallyClosed || !wsEndpoint.autoReconnect) {
              if (wsEndpoint.onEnd != null) {
                wsEndpoint.onEnd!();
              }
              return;
            }

            if (wsEndpoint.onReconnect != null) {
              wsEndpoint.onReconnect!(Exception('WebSocket disconnected'));
            }

            _workingIndex = (_workingIndex + 1) % _endpointArray.length;
            connectWs(wsEndpoint);
          },
        );

        // Store the channel in the endpoint
        wsEndpoint.ws = channel;

        // Create the connected completer
        wsEndpoint.connected = Completer<void>();

        // Handle the open event by resubscribing
        // We use a small delay to ensure the connection is ready
        await Future.delayed(const Duration(milliseconds: 100));

        // Subscribe to all previously-subscribed scripts
        for (final scriptKey in wsEndpoint.subs.scripts) {
          final parts = scriptKey.split(':');
          if (parts.length == 2) {
            wsEndpoint.subscribeToScript(ScriptType.from(parts[0]), parts[1]);
          }
        }

        // Subscribe to all previously-subscribed lokadIds
        for (final lokadId in wsEndpoint.subs.lokadIds) {
          wsEndpoint.subscribeToLokadId(lokadId);
        }

        // Subscribe to all previously-subscribed tokenIds
        for (final tokenId in wsEndpoint.subs.tokenIds) {
          wsEndpoint.subscribeToTokenId(tokenId);
        }

        // Subscribe to all previously-subscribed txids
        for (final txid in wsEndpoint.subs.txids) {
          wsEndpoint.subscribeToTxid(txid);
        }

        // Subscribe to blocks method, if previously subscribed
        if (wsEndpoint.subs.blocks) {
          wsEndpoint.subscribeToBlocks();
        }

        // Subscribe to txs method, if previously subscribed
        if (wsEndpoint.subs.txs) {
          wsEndpoint.subscribeToTxs();
        }

        // Complete the connected future
        if (!wsEndpoint.connected!.isCompleted) {
          wsEndpoint.connected!.complete();
        }

        // Call the onConnect callback
        if (wsEndpoint.onConnect != null) {
          wsEndpoint.onConnect!();
        }

        // If no errors thrown from above call then set this index to state
        _workingIndex = index;
        return;
      } catch (e) {
        // Connection setup failed, try next server
        continue;
      }
    }
  }

  // If no websocket URLs connect, throw error
  throw AllServersFailedException(
    'Error connecting to known Chronik websockets',
    failedServers: _endpointArray.map((e) => e.wsUrl).toList(),
  );
}