parseSSEChunk method
Parse a Server-Sent Events (SSE) chunk from OpenAI's streaming API
This method handles incomplete SSE chunks that can be split across network boundaries. It maintains an internal buffer to reconstruct complete SSE events.
Returns:
List<Map<String, dynamic>>
- List of parsed JSON objects from the chunk- Empty list if no valid data found or chunk should be skipped
Throws:
ResponseFormatError
- If critical parsing errors occur
Implementation
List<Map<String, dynamic>> parseSSEChunk(String chunk) {
final results = <Map<String, dynamic>>[];
// Add new chunk to buffer efficiently
_sseBuffer.write(chunk);
// Convert to string only when we need to process
final bufferContent = _sseBuffer.toString();
// Find complete lines (ending with \n)
final lastNewlineIndex = bufferContent.lastIndexOf('\n');
if (lastNewlineIndex == -1) {
// No complete lines yet, keep buffering
return results;
}
// Extract complete lines for processing
final completeContent = bufferContent.substring(0, lastNewlineIndex + 1);
final remainingContent = bufferContent.substring(lastNewlineIndex + 1);
// Update buffer with remaining incomplete content
_sseBuffer.clear();
if (remainingContent.isNotEmpty) {
_sseBuffer.write(remainingContent);
}
// Process complete lines
final lines = completeContent.split('\n');
for (final line in lines) {
final trimmedLine = line.trim();
if (trimmedLine.isEmpty) continue;
if (trimmedLine.startsWith('data: ')) {
final data = trimmedLine.substring(6).trim();
// Handle completion signal
if (data == '[DONE]') {
// Clear buffer and return empty list to signal completion
_sseBuffer.clear();
return [];
}
// Skip empty data
if (data.isEmpty) {
continue;
}
try {
final json = jsonDecode(data);
if (json is! Map<String, dynamic>) {
logger.warning('SSE chunk is not a JSON object: $data');
continue;
}
// Check for error in the SSE data
if (json.containsKey('error')) {
final error = json['error'] as Map<String, dynamic>?;
if (error != null) {
final message = error['message'] as String? ?? 'Unknown error';
final type = error['type'] as String?;
final code = error['code'] as String?;
throw ResponseFormatError(
'SSE stream error: $message${type != null ? ' (type: $type)' : ''}${code != null ? ' (code: $code)' : ''}',
data,
);
}
}
results.add(json);
} catch (e) {
if (e is LLMError) rethrow;
// Log and skip malformed JSON chunks, but don't fail the entire stream
logger.warning('Failed to parse SSE chunk JSON: $e, data: $data');
continue;
}
}
}
return results;
}