sse method

Future<void> sse(
  1. String endpoint, {
  2. String method = 'GET',
  3. Map<String, String>? headers,
  4. dynamic body,
  5. Map<String, dynamic>? queryParameters,
  6. required void onChunk(
    1. String chunk
    ),
  7. void onDone()?,
  8. void onError(
    1. Object error,
    2. StackTrace stackTrace
    )?,
})

Server-Sent Events (SSE) stream with callback Returns raw data as strings, not parsed through AppResponse Calls onChunk callback for each chunk received

Implementation

Future<void> sse(
  String endpoint, {
  String method = 'GET',
  Map<String, String>? headers,
  dynamic body,
  Map<String, dynamic>? queryParameters,
  required void Function(String chunk) onChunk,
  void Function()? onDone,
  void Function(Object error, StackTrace stackTrace)? onError,
}) async {
  try {
    final url = _buildUrl(endpoint);
    final uri = Uri.parse(url).replace(queryParameters: queryParameters);
    final requestHeaders = _buildHeaders(headers);
    requestHeaders['Accept'] = 'text/event-stream';
    requestHeaders['Cache-Control'] = 'no-cache';

    // Create request based on method
    final request = http.Request(method.toUpperCase(), uri);
    request.headers.addAll(requestHeaders);

    if (body != null &&
        (method.toUpperCase() == 'POST' ||
            method.toUpperCase() == 'PUT' ||
            method.toUpperCase() == 'PATCH')) {
      request.body = jsonEncode(body);
    }

    // Send request
    final response = await _client.send(request);

    if (response.statusCode >= 400) {
      throw HttpException(
        statusCode: response.statusCode,
        body: 'SSE connection failed',
      );
    }

    // Process SSE stream
    final stream = response.stream
        .transform(utf8.decoder)
        .transform(const LineSplitter());

    String currentEvent = '';
    String currentData = '';

    await for (final line in stream) {
      if (line.isEmpty) {
        // Empty line indicates end of event
        if (currentData.isNotEmpty) {
          onChunk(currentData);
          currentData = '';
          currentEvent = '';
        }
        continue;
      }

      if (line.startsWith('event:')) {
        currentEvent = line.substring(6).trim();
      } else if (line.startsWith('data:')) {
        final data = line.substring(5).trim();
        if (currentData.isNotEmpty) {
          currentData += '\n$data';
        } else {
          currentData = data;
        }
      } else if (line.startsWith(':')) {
        // Comment line, ignore
        continue;
      }
    }

    // Yield any remaining data
    if (currentData.isNotEmpty) {
      onChunk(currentData);
    }

    // Call onDone callback
    onDone?.call();
  } on http.ClientException catch (e, stackTrace) {
    final exception = TuulException(
      code: 'SSE_NETWORK_ERROR',
      message: 'SSE network error occurred',
      details: e.message,
    );
    if (onError != null) {
      onError(exception, stackTrace);
    } else {
      rethrow;
    }
  } on HttpException catch (e, stackTrace) {
    if (onError != null) {
      onError(e, stackTrace);
    } else {
      rethrow;
    }
  } catch (e, stackTrace) {
    final exception = TuulException(
      code: 'SSE_ERROR',
      message: 'SSE stream error',
      details: e.toString(),
    );
    if (onError != null) {
      onError(exception, stackTrace);
    } else {
      rethrow;
    }
  }
}