connect method

  1. @override
Future<void> connect(
  1. String? url,
  2. TransferFormat transferFormat
)
override

Implementation

@override
Future<void> connect(String? url, TransferFormat transferFormat) async {
  assert(!isStringEmpty(url));
  _logger?.finest("(SSE transport) Connecting");

  // set url before accessTokenFactory because this.url is only for send and we set the auth header instead of the query string for send
  _url = url;

  if (_accessTokenFactory != null) {
    final token = await _accessTokenFactory!();
    if (!isStringEmpty(token)) {
      final encodedToken = Uri.encodeComponent(token);
      url = url! +
          (url.indexOf("?") < 0 ? "?" : "&") +
          "access_token=$encodedToken";
    }
  }

  var opened = false;
  if (transferFormat != TransferFormat.Text) {
    return Future.error(GeneralError(
        "The Server-Sent Events transport only supports the 'Text' transfer format"));
  }

  SseChannel client;
  try {
    client = SseChannel.connect(Uri.parse(url!));
    _logger?.finer('(SSE transport) connected to $url');
    opened = true;
    _sseClient = client;
  } catch (e) {
    return Future.error(e);
  }

  _sseClient!.stream.listen((data) {
    if (onReceive != null) {
      try {
        _logger?.finest(
            '(SSE transport) data received. ${getDataDetail(data, _logMessageContent)}.');
        onReceive!(data);
      } catch (error) {
        _close(error: error);
        return;
      }
    }
  }, onError: (e) {
    _logger?.severe('(SSE transport) error when listening to stream: $e');
    if (opened) {
      _close(error: e);
    }
  });
}