streamCall method

Future<void> streamCall(
  1. FunctionCall functionCall,
  2. void onStream(
    1. String event,
    2. ToolReturn toolReturn
    )
)
override

Implementation

Future<void> streamCall(
  FunctionCall functionCall,
  void Function(String event, ToolReturn toolReturn) onStream,
) async {
  Stream<String> sseStream = await _streamCallJsonRpcHttp(
    functionCall.id,
    functionCall.name,
    functionCall.arguments,
  );

  final completer = Completer<void>();

  sseStream.listen(
    (sseString) {
      _onData(sseString, (String event, Map<String, dynamic> data) {
        if (event == EventType.START) {
          onStream(event, ToolReturn(id: functionCall.id, result: data));
        } else if (event == EventType.DATA || event == EventType.ERROR) {
          JsonRPCHttpResponseBody responseBody =
              JsonRPCHttpResponseBody.fromJson(data);
          if (responseBody.error != null) {
            throw OpenToolServerCallException(responseBody.error!.message);
          }
          onStream(
            event,
            ToolReturn(id: functionCall.id, result: responseBody.result),
          );
        }
      });
    },
    onDone: () {
      onStream(
        EventType.DONE,
        ToolReturn(
          id: functionCall.id,
          result: {EventType.DONE: functionCall.name},
        ),
      );
      if (!completer.isCompleted) completer.complete();
    },
    onError: (e) {
      if (!completer.isCompleted) {
        completer.completeError(OpenToolServerCallException(e.toString()));
      }
    },
  );

  return completer.future;
}