subscribeToSSE static method

Stream<SSEModel> subscribeToSSE({
  1. required SSERequestType method,
  2. required String url,
  3. required Map<String, String> header,
  4. Map<String, dynamic>? body,
})

def: Subscribes to SSE param: method->Request method ie: GET/POST url->URl of the SSE api header->Map<String,String>, key value pair of the request header

Implementation

static Stream<SSEModel> subscribeToSSE(
    {required SSERequestType method,
    required String url,
    required Map<String, String> header,
    Map<String, dynamic>? body}) {
  var lineRegex = RegExp(r'^([^:]*)(?::)?(?: )?(.*)?$');
  var currentSSEModel = SSEModel(data: '', id: '', event: '');
  // ignore: close_sinks
  StreamController<SSEModel> streamController = new StreamController();
  print("--SUBSCRIBING TO SSE---");
  while (true) {
    try {
      _client = http.Client();
      var request = new http.Request(
        method == SSERequestType.GET ? "GET" : "POST",
        Uri.parse(url),
      );

      ///Adding headers to the request
      header.forEach((key, value) {
        request.headers[key] = value;
      });

      ///Adding body to the request if exists
      if (body != null) {
        request.body = jsonEncode(body);
      }

      Future<http.StreamedResponse> response = _client.send(request);

      ///Listening to the response as a stream
      response.asStream().listen((data) {
        ///Applying transforms and listening to it
        data.stream
          ..transform(Utf8Decoder()).transform(LineSplitter()).listen(
            (dataLine) {
              if (dataLine.isEmpty) {
                ///This means that the complete event set has been read.
                ///We then add the event to the stream
                streamController.add(currentSSEModel);
                currentSSEModel = SSEModel(data: '', id: '', event: '');
                return;
              }

              ///Get the match of each line through the regex
              Match match = lineRegex.firstMatch(dataLine)!;
              var field = match.group(1);
              if (field!.isEmpty) {
                return;
              }
              var value = '';
              if (field == 'data') {
                //If the field is data, we get the data through the substring
                value = dataLine.substring(
                  5,
                );
              } else {
                value = match.group(2) ?? '';
              }
              switch (field) {
                case 'event':
                  currentSSEModel.event = value;
                  break;
                case 'data':
                  currentSSEModel.data =
                      (currentSSEModel.data ?? '') + value + '\n';
                  break;
                case 'id':
                  currentSSEModel.id = value;
                  break;
                case 'retry':
                  break;
              }
            },
            onError: (e, s) {
              print('---ERROR---');
              print(e);
              streamController.addError(e, s);
            },
          );
      }, onError: (e, s) {
        print('---ERROR---');
        print(e);
        streamController.addError(e, s);
      });
    } catch (e, s) {
      print('---ERROR---');
      print(e);
      streamController.addError(e, s);
    }

    Future.delayed(Duration(seconds: 1), () {});
    return streamController.stream;
  }
}