sseStream method
Server-Sent Events (SSE) as stream Returns a Stream
Implementation
Stream<String> sseStream(
String endpoint, {
String method = 'GET',
Map<String, String>? headers,
dynamic body,
Map<String, dynamic>? queryParameters,
}) async* {
try {
final url = _buildUrl(endpoint);
final uri = Uri.parse(url).replace(queryParameters: queryParameters);
final requestHeaders = _buildHeaders(headers);
requestHeaders['Accept'] = 'text/event-stream';
requestHeaders['Cache-Control'] = 'no-cache';
// Create request based on method
final request = http.Request(method.toUpperCase(), uri);
request.headers.addAll(requestHeaders);
if (body != null &&
(method.toUpperCase() == 'POST' ||
method.toUpperCase() == 'PUT' ||
method.toUpperCase() == 'PATCH')) {
request.body = jsonEncode(body);
}
// Send request
final response = await _client.send(request);
if (response.statusCode >= 400) {
throw HttpException(
statusCode: response.statusCode,
body: 'SSE connection failed',
);
}
// Process SSE stream
final stream = response.stream
.transform(utf8.decoder)
.transform(const LineSplitter());
String currentEvent = '';
String currentData = '';
await for (final line in stream) {
if (line.isEmpty) {
// Empty line indicates end of event
if (currentData.isNotEmpty) {
yield currentData;
currentData = '';
currentEvent = '';
}
continue;
}
if (line.startsWith('event:')) {
currentEvent = line.substring(6).trim();
} else if (line.startsWith('data:')) {
final data = line.substring(5).trim();
if (currentData.isNotEmpty) {
currentData += '\n$data';
} else {
currentData = data;
}
} else if (line.startsWith(':')) {
// Comment line, ignore
continue;
}
}
// Yield any remaining data
if (currentData.isNotEmpty) {
yield currentData;
}
} on http.ClientException catch (e) {
throw TuulException(
code: 'SSE_NETWORK_ERROR',
message: 'SSE network error occurred',
details: e.message,
);
} on HttpException {
rethrow;
} catch (e) {
throw TuulException(
code: 'SSE_ERROR',
message: 'SSE stream error',
details: e.toString(),
);
}
}