subscribeToSSE static method
def: Subscribes to SSE
param:
method
->Request method ie: GET/POST
url
->URl of the SSE api
header
->Map<String,String>, key value pair of the request header
Implementation
static Stream<SSEModel> subscribeToSSE(
{required SSERequestType method,
required String url,
required Map<String, String> header,
Map<String, dynamic>? body}) {
var lineRegex = RegExp(r'^([^:]*)(?::)?(?: )?(.*)?$');
var currentSSEModel = SSEModel(data: '', id: '', event: '');
// ignore: close_sinks
StreamController<SSEModel> streamController = new StreamController();
print("--SUBSCRIBING TO SSE---");
while (true) {
try {
_client = http.Client();
var request = new http.Request(
method == SSERequestType.GET ? "GET" : "POST",
Uri.parse(url),
);
///Adding headers to the request
header.forEach((key, value) {
request.headers[key] = value;
});
///Adding body to the request if exists
if (body != null) {
request.body = jsonEncode(body);
}
Future<http.StreamedResponse> response = _client.send(request);
///Listening to the response as a stream
response.asStream().listen((data) {
///Applying transforms and listening to it
data.stream
..transform(Utf8Decoder()).transform(LineSplitter()).listen(
(dataLine) {
if (dataLine.isEmpty) {
///This means that the complete event set has been read.
///We then add the event to the stream
streamController.add(currentSSEModel);
currentSSEModel = SSEModel(data: '', id: '', event: '');
return;
}
///Get the match of each line through the regex
Match match = lineRegex.firstMatch(dataLine)!;
var field = match.group(1);
if (field!.isEmpty) {
return;
}
var value = '';
if (field == 'data') {
//If the field is data, we get the data through the substring
value = dataLine.substring(
5,
);
} else {
value = match.group(2) ?? '';
}
switch (field) {
case 'event':
currentSSEModel.event = value;
break;
case 'data':
currentSSEModel.data =
(currentSSEModel.data ?? '') + value + '\n';
break;
case 'id':
currentSSEModel.id = value;
break;
case 'retry':
break;
}
},
onError: (e, s) {
print('---ERROR---');
print(e);
streamController.addError(e, s);
},
);
}, onError: (e, s) {
print('---ERROR---');
print(e);
streamController.addError(e, s);
});
} catch (e, s) {
print('---ERROR---');
print(e);
streamController.addError(e, s);
}
Future.delayed(Duration(seconds: 1), () {});
return streamController.stream;
}
}