decodeSseStream function

Stream<SseEvent> decodeSseStream(
  1. Stream<List<int>> source
)

Decodes a UTF-8 byte stream into SseEvents. Buffers across chunk boundaries (one event may span several Stream<List<int>> chunks), joins multi-line data: payloads with \n per the SSE spec, and filters out keep-alive comments.

Implementation

Stream<SseEvent> decodeSseStream(Stream<List<int>> source) async* {
  final lines = source.transform(utf8.decoder).transform(const LineSplitter());
  String? eventName;
  final dataBuf = StringBuffer();
  await for (final line in lines) {
    if (line.isEmpty) {
      // Blank line = dispatch.
      if (dataBuf.isNotEmpty || eventName != null) {
        yield SseEvent(event: eventName, data: dataBuf.toString());
        eventName = null;
        dataBuf.clear();
      }
      continue;
    }
    if (line.startsWith(':')) continue; // SSE comment / keep-alive.
    if (line.startsWith('event:')) {
      eventName = line.substring(6).trim();
      continue;
    }
    if (line.startsWith('data:')) {
      // Strip exactly one leading space per spec.
      var chunk = line.substring(5);
      if (chunk.startsWith(' ')) chunk = chunk.substring(1);
      if (dataBuf.isNotEmpty) dataBuf.write('\n');
      dataBuf.write(chunk);
      continue;
    }
    // Other fields (id:, retry:) ignored — we only need event/data.
  }
  // Stream ended on a non-blank trailing line — flush.
  if (dataBuf.isNotEmpty || eventName != null) {
    yield SseEvent(event: eventName, data: dataBuf.toString());
  }
}