sse method

Future<void> sse(
  1. String endpoint, {
  2. String method = 'GET',
  3. Map<String, String>? headers,
  4. dynamic body,
  5. Map<String, dynamic>? queryParameters,
  6. required void onChunk(
    1. String chunk
    ),
  7. void onDone()?,
  8. void onError(
    1. Object error,
    2. StackTrace stackTrace
    )?,
})

Initiates a Server-Sent Events (SSE) connection using a callback mechanism.

Useful for UI updates where you want to provide a specific onChunk listener. Automatically handles the standard SSE data: and event: protocol.

Implementation

Future<void> sse(
  String endpoint, {
  String method = 'GET',
  Map<String, String>? headers,
  dynamic body,
  Map<String, dynamic>? queryParameters,
  required void Function(String chunk) onChunk,
  void Function()? onDone,
  void Function(Object error, StackTrace stackTrace)? onError,
}) async {
  try {
    final url = _buildUrl(endpoint);
    final uri = Uri.parse(url).replace(queryParameters: queryParameters);
    final requestHeaders = _buildHeaders(headers);
    requestHeaders['Accept'] = 'text/event-stream';
    requestHeaders['Cache-Control'] = 'no-cache';

    final request = http.Request(method.toUpperCase(), uri);
    request.headers.addAll(requestHeaders);

    if (body != null &&
        (method.toUpperCase() == 'POST' ||
            method.toUpperCase() == 'PUT' ||
            method.toUpperCase() == 'PATCH')) {
      request.body = jsonEncode(body);
    }

    final response = await _client.send(request);

    if (response.statusCode >= 400) {
      throw HttpException(
        statusCode: response.statusCode,
        body: 'SSE connection failed',
      );
    }

    final stream = response.stream
        .transform(utf8.decoder)
        .transform(const LineSplitter());

    String currentData = '';

    await for (final line in stream) {
      if (line.isEmpty) {
        if (currentData.isNotEmpty) {
          onChunk(currentData);
          currentData = '';
        }
        continue;
      }

      if (line.startsWith('data:')) {
        final data = line.substring(5).trim();
        currentData = currentData.isEmpty ? data : '$currentData\n$data';
      } else if (line.startsWith('event:') || line.startsWith(':')) {
        // Metadata or comments - current logic ignores comments but can capture event types here.
        continue;
      }
    }

    if (currentData.isNotEmpty) onChunk(currentData);
    onDone?.call();
  } on http.ClientException catch (e, stackTrace) {
    final exception = TuulException(
      code: 'SSE_NETWORK_ERROR',
      message: 'SSE network error occurred',
      details: e.message,
    );
    if (onError != null)
      onError(exception, stackTrace);
    else
      rethrow;
  } on HttpException catch (e, stackTrace) {
    if (onError != null)
      onError(e, stackTrace);
    else
      rethrow;
  } catch (e, stackTrace) {
    final exception = TuulException(
      code: 'SSE_ERROR',
      message: 'SSE stream error',
      details: e.toString(),
    );
    if (onError != null)
      onError(exception, stackTrace);
    else
      rethrow;
  }
}