getStream<T> static method

Stream<T> getStream<T>({
  1. required String from,
  2. required T onSuccess(
    1. Map<String, dynamic>
    ),
})

Implementation

static Stream<T> getStream<T>({
  required String from,
  required T Function(Map<String, dynamic>) onSuccess,
}) {
  final controller = StreamController<T>();
  final http.Client client = http.Client();

  final request = http.Request("GET", Uri.parse(from));
  request.headers.addAll(AIConfigBuilder.instance.headers());

  void close() {
    client.close();
    controller.close();
  }

  client.send(request).then((streamedResponse) {
    streamedResponse.stream.listen((value) {
      final String data = utf8.decode(value);

      final List<String> dataLines =
          data.split("\n").where((element) => element.isNotEmpty).toList();

      for (String line in dataLines) {
        if (line.startsWith("data: ")) {
          final String data = line.substring(6);
          if (data.startsWith("[DONE]")) {
            AILogger.log("stream response is done");
            return;
          }

          final decoded = jsonDecode(data) as Map<String, dynamic>;
          controller.add(onSuccess(decoded));
        }
      }
    }, onDone: () {
      close();
    }, onError: (err) {
      controller.addError(err);
    });
  });

  return controller.stream;
}