decodeSseStream function
Decodes a UTF-8 byte stream into SseEvents. Buffers across chunk
boundaries (one event may span several Stream<List<int>> chunks),
joins multi-line data: payloads with \n per the SSE spec, and
filters out keep-alive comments.
Implementation
Stream<SseEvent> decodeSseStream(Stream<List<int>> source) async* {
final lines = source.transform(utf8.decoder).transform(const LineSplitter());
String? eventName;
final dataBuf = StringBuffer();
await for (final line in lines) {
if (line.isEmpty) {
// Blank line = dispatch.
if (dataBuf.isNotEmpty || eventName != null) {
yield SseEvent(event: eventName, data: dataBuf.toString());
eventName = null;
dataBuf.clear();
}
continue;
}
if (line.startsWith(':')) continue; // SSE comment / keep-alive.
if (line.startsWith('event:')) {
eventName = line.substring(6).trim();
continue;
}
if (line.startsWith('data:')) {
// Strip exactly one leading space per spec.
var chunk = line.substring(5);
if (chunk.startsWith(' ')) chunk = chunk.substring(1);
if (dataBuf.isNotEmpty) dataBuf.write('\n');
dataBuf.write(chunk);
continue;
}
// Other fields (id:, retry:) ignored — we only need event/data.
}
// Stream ended on a non-blank trailing line — flush.
if (dataBuf.isNotEmpty || eventName != null) {
yield SseEvent(event: eventName, data: dataBuf.toString());
}
}