subscribeToSSE static method

Stream<SSEModel> subscribeToSSE({
  1. required SSERequestType method,
  2. required String url,
  3. required Map<String, String> header,
  4. StreamController<SSEModel>? oldStreamController,
  5. Client? client,
  6. Map<String, dynamic>? body,
  7. RetryOptions? retryOptions,
  8. int retryCount = 0,
})

Subscribe to Server-Sent Events.

method is the request method (GET or POST). url is the URL of the SSE endpoint. header is a map of request headers. body is an optional request body for POST requests. oldStreamController stream controller, used to retry to persist the stream from the old connection. client is an optional http client used for testing purpose or custom client. retryOptions is the options for retrying the connection. retryCount is the current retry count.

Returns a Stream of SSEModel representing the SSE events.

Implementation

static Stream<SSEModel> subscribeToSSE({
  required SSERequestType method,
  required String url,
  required Map<String, String> header,
  StreamController<SSEModel>? oldStreamController,
  http.Client? client,
  Map<String, dynamic>? body,
  RetryOptions? retryOptions,
  int retryCount = 0,
}) {
  RetryOptions _retryOptions = retryOptions ?? RetryOptions();
  StreamController<SSEModel> streamController = StreamController();
  if (oldStreamController != null) {
    streamController = oldStreamController;
  }
  var lineRegex = RegExp(r'^([^:]*)(?::)?(?: )?(.*)?$');
  var currentSSEModel = SSEModel(data: '', id: '', event: '');
  _log.info("--SUBSCRIBING TO SSE---");
  while (true) {
    try {
      _client = client ?? http.Client();
      var request = new http.Request(
        method == SSERequestType.GET ? "GET" : "POST",
        Uri.parse(url),
      );

      /// Adding headers to the request
      header.forEach((key, value) {
        request.headers[key] = value;
      });

      /// Adding body to the request if exists
      if (body != null) {
        request.body = jsonEncode(body);
      }

      Future<http.StreamedResponse> response = _client.send(request);

      /// Listening to the response as a stream
      response.asStream().listen((data) {
        if (data.statusCode != 200) {
          _log.severe('---ERROR CODE ${data.statusCode}---');
          _retryConnection(
            method: method,
            url: url,
            header: header,
            body: body,
            streamController: streamController,
            retryOptions: _retryOptions,
            currentRetry: retryCount,
          );
          return;
        }

        /// Applying transforms and listening to it
        data.stream
          ..transform(Utf8Decoder()).transform(LineSplitter()).listen(
            (dataLine) {
              if (dataLine.isEmpty) {
                /// This means that the complete event set has been read.
                /// We then add the event to the stream
                streamController.add(currentSSEModel);
                currentSSEModel = SSEModel(data: '', id: '', event: '');
                return;
              }

              /// Get the match of each line through the regex
              Match match = lineRegex.firstMatch(dataLine)!;
              var field = match.group(1);
              if (field!.isEmpty) {
                return;
              }
              var value = '';
              if (field == 'data') {
                // If the field is data, we get the data through the substring
                value = dataLine.substring(
                  5,
                );
              } else {
                value = match.group(2) ?? '';
              }
              switch (field) {
                case 'event':
                  currentSSEModel.event = value;
                  break;
                case 'data':
                  currentSSEModel.data =
                      (currentSSEModel.data ?? '') + value + '\n';
                  break;
                case 'id':
                  currentSSEModel.id = value;
                  break;
                case 'retry':
                  break;
                default:
                  _log.severe('---ERROR---');
                  _log.severe(dataLine);
                  _retryConnection(
                    method: method,
                    url: url,
                    header: header,
                    streamController: streamController,
                    retryOptions: _retryOptions,
                    currentRetry: retryCount,
                  );
              }
            },
            onError: (e, s) {
              _log.severe('---ERROR---');
              _log.severe(e);
              _retryConnection(
                method: method,
                url: url,
                header: header,
                body: body,
                streamController: streamController,
                currentRetry: retryCount,
                retryOptions: _retryOptions,
              );
            },
          );
      }, onError: (e, s) {
        _log.severe('---ERROR---');
        _log.severe(e);
        _retryConnection(
          method: method,
          url: url,
          header: header,
          body: body,
          streamController: streamController,
          retryOptions: _retryOptions,
          currentRetry: retryCount,
        );
      });
    } catch (e) {
      _log.severe('---ERROR---');
      _log.severe(e);
      _retryConnection(
        method: method,
        url: url,
        header: header,
        body: body,
        streamController: streamController,
        retryOptions: _retryOptions,
        currentRetry: retryCount,
      );
    }
    return streamController.stream;
  }
}