sseStream method

Stream<String> sseStream(
  1. String endpoint, {
  2. String method = 'GET',
  3. Map<String, String>? headers,
  4. dynamic body,
  5. Map<String, dynamic>? queryParameters,
})

Connects to a Server-Sent Events (SSE) source and returns a Stream.

This is the preferred method for functional programming styles or using StreamBuilder in Flutter.

Implementation

Stream<String> sseStream(
  String endpoint, {
  String method = 'GET',
  Map<String, String>? headers,
  dynamic body,
  Map<String, dynamic>? queryParameters,
}) async* {
  try {
    final url = _buildUrl(endpoint);
    final uri = Uri.parse(url).replace(queryParameters: queryParameters);
    final requestHeaders = _buildHeaders(headers);
    requestHeaders['Accept'] = 'text/event-stream';
    requestHeaders['Cache-Control'] = 'no-cache';

    final request = http.Request(method.toUpperCase(), uri);
    request.headers.addAll(requestHeaders);

    if (body != null &&
        (method.toUpperCase() == 'POST' ||
            method.toUpperCase() == 'PUT' ||
            method.toUpperCase() == 'PATCH')) {
      request.body = jsonEncode(body);
    }

    final response = await _client.send(request);

    if (response.statusCode >= 400) {
      throw HttpException(
        statusCode: response.statusCode,
        body: 'SSE connection failed',
      );
    }

    final stream = response.stream
        .transform(utf8.decoder)
        .transform(const LineSplitter());

    String currentData = '';

    await for (final line in stream) {
      if (line.isEmpty) {
        if (currentData.isNotEmpty) {
          yield currentData;
          currentData = '';
        }
        continue;
      }

      if (line.startsWith('data:')) {
        final data = line.substring(5).trim();
        currentData = currentData.isEmpty ? data : '$currentData\n$data';
      }
    }

    if (currentData.isNotEmpty) yield currentData;
  } on http.ClientException catch (e) {
    throw TuulException(
      code: 'SSE_NETWORK_ERROR',
      message: 'SSE network error occurred',
      details: e.message,
    );
  } on HttpException {
    rethrow;
  } catch (e) {
    throw TuulException(
      code: 'SSE_ERROR',
      message: 'SSE stream error',
      details: e.toString(),
    );
  }
}