parseSSEChunk method

List<Map<String, dynamic>> parseSSEChunk(
  1. String chunk
)

Parse a Server-Sent Events (SSE) chunk from OpenAI's streaming API

This method handles incomplete SSE chunks that can be split across network boundaries. It maintains an internal buffer to reconstruct complete SSE events.

Returns:

  • List<Map<String, dynamic>> - List of parsed JSON objects from the chunk
  • Empty list if no valid data found or chunk should be skipped

Throws:

  • ResponseFormatError - If critical parsing errors occur

Implementation

List<Map<String, dynamic>> parseSSEChunk(String chunk) {
  final results = <Map<String, dynamic>>[];

  // Add new chunk to buffer efficiently
  _sseBuffer.write(chunk);

  // Convert to string only when we need to process
  final bufferContent = _sseBuffer.toString();

  // Find complete lines (ending with \n)
  final lastNewlineIndex = bufferContent.lastIndexOf('\n');

  if (lastNewlineIndex == -1) {
    // No complete lines yet, keep buffering
    return results;
  }

  // Extract complete lines for processing
  final completeContent = bufferContent.substring(0, lastNewlineIndex + 1);
  final remainingContent = bufferContent.substring(lastNewlineIndex + 1);

  // Update buffer with remaining incomplete content
  _sseBuffer.clear();
  if (remainingContent.isNotEmpty) {
    _sseBuffer.write(remainingContent);
  }

  // Process complete lines
  final lines = completeContent.split('\n');
  for (final line in lines) {
    final trimmedLine = line.trim();

    if (trimmedLine.isEmpty) continue;

    if (trimmedLine.startsWith('data: ')) {
      final data = trimmedLine.substring(6).trim();

      // Handle completion signal
      if (data == '[DONE]') {
        // Clear buffer and return empty list to signal completion
        _sseBuffer.clear();
        return [];
      }

      // Skip empty data
      if (data.isEmpty) {
        continue;
      }

      try {
        final json = jsonDecode(data);
        if (json is! Map<String, dynamic>) {
          logger.warning('SSE chunk is not a JSON object: $data');
          continue;
        }

        // Check for error in the SSE data
        if (json.containsKey('error')) {
          final error = json['error'] as Map<String, dynamic>?;
          if (error != null) {
            final message = error['message'] as String? ?? 'Unknown error';
            final type = error['type'] as String?;
            final code = error['code'] as String?;

            throw ResponseFormatError(
              'SSE stream error: $message${type != null ? ' (type: $type)' : ''}${code != null ? ' (code: $code)' : ''}',
              data,
            );
          }
        }

        results.add(json);
      } catch (e) {
        if (e is LLMError) rethrow;

        // Log and skip malformed JSON chunks, but don't fail the entire stream
        logger.warning('Failed to parse SSE chunk JSON: $e, data: $data');
        continue;
      }
    }
  }

  return results;
}