createMessageStream method
Stream<StreamEvent>
createMessageStream({
- required List<
Message> messages, - required String systemPrompt,
- List<
ToolDefinition> tools = const [], - int? maxTokens,
override
Stream a message completion via the Gemini streamGenerateContent endpoint.
The endpoint returns SSE events of the form:
data: {"candidates":[{"content":{"parts":[{"text":"..."}]}}]}
These are translated into the app's StreamEvent types so the rest of the codebase can consume them identically to Anthropic or OpenAI streams.
Implementation
@override
Stream<StreamEvent> createMessageStream({
required List<Message> messages,
required String systemPrompt,
List<ToolDefinition> tools = const [],
int? maxTokens,
}) async* {
final body = _buildRequestBody(
messages: messages,
systemPrompt: systemPrompt,
tools: tools,
maxTokens: maxTokens,
);
final url = '${config.baseUrl}/models/${config.model}'
':streamGenerateContent?alt=sse&key=${config.apiKey}';
final request = http.Request('POST', Uri.parse(url));
request.headers.addAll({
'Content-Type': 'application/json',
...config.extraHeaders,
});
request.body = jsonEncode(body);
_log.d('Gemini stream request to ${config.model}');
final client = http.Client();
http.StreamedResponse response;
try {
response = await client.send(request);
} catch (e) {
client.close();
_log.e('Gemini connection error', error: e);
yield ErrorEvent(message: 'Gemini connection error: $e', type: 'network_error');
return;
}
if (response.statusCode != 200) {
final errorBody = await response.stream.bytesToString();
client.close();
_log.e('Gemini API error ${response.statusCode}: $errorBody');
yield ErrorEvent(
message: 'Gemini API error ${response.statusCode}: $errorBody',
type: 'api_error',
);
return;
}
// Emit synthetic message-start event.
final messageId = 'msg_${_uuid.v4()}';
yield MessageStartEvent(messageId: messageId, model: config.model);
var blockIndex = 0;
var hasStartedText = false;
await for (final event in _parseGeminiSSE(response.stream)) {
final candidates = event['candidates'] as List?;
if (candidates == null || candidates.isEmpty) {
// Check for top-level errors.
final error = event['error'] as Map<String, dynamic>?;
if (error != null) {
yield ErrorEvent(
message: error['message'] as String? ?? 'Unknown Gemini error',
type: 'api_error',
);
}
continue;
}
final candidate = candidates[0] as Map<String, dynamic>;
final content = candidate['content'] as Map<String, dynamic>?;
final parts = content?['parts'] as List?;
if (parts != null) {
for (final part in parts) {
final partMap = part as Map<String, dynamic>;
// Handle text content.
final text = partMap['text'] as String?;
if (text != null && text.isNotEmpty) {
if (!hasStartedText) {
yield ContentBlockStartEvent(
index: blockIndex,
block: const TextBlock(''),
);
hasStartedText = true;
}
yield ContentBlockDeltaEvent(index: blockIndex, text: text);
}
// Handle function calls (tool use).
final functionCall = partMap['functionCall'] as Map<String, dynamic>?;
if (functionCall != null) {
if (hasStartedText) {
yield ContentBlockStopEvent(index: blockIndex);
blockIndex++;
hasStartedText = false;
}
final name = functionCall['name'] as String? ?? '';
final args =
(functionCall['args'] as Map<String, dynamic>?) ?? {};
final toolId = 'call_${_uuid.v4()}';
yield ContentBlockStartEvent(
index: blockIndex,
block: ToolUseBlock(id: toolId, name: name, input: args),
);
// Emit the full arguments as a delta for consistency.
yield ContentBlockDeltaEvent(
index: blockIndex,
text: jsonEncode(args),
);
yield ContentBlockStopEvent(index: blockIndex);
blockIndex++;
}
}
}
// Check for finish reason.
final finishReason = candidate['finishReason'] as String?;
if (finishReason != null) {
if (hasStartedText) {
yield ContentBlockStopEvent(index: blockIndex);
hasStartedText = false;
}
final stopReason = _parseFinishReason(finishReason);
final usageMetadata = event['usageMetadata'] as Map<String, dynamic>?;
yield MessageDeltaEvent(
stopReason: stopReason,
usage: usageMetadata != null
? TokenUsage(
inputTokens:
usageMetadata['promptTokenCount'] as int? ?? 0,
outputTokens:
usageMetadata['candidatesTokenCount'] as int? ?? 0,
)
: null,
);
yield const MessageStopEvent();
}
}
client.close();
}