streamFirebaseFuncionLineByLine function

Future<StreamSubscription<String>> streamFirebaseFuncionLineByLine(
  1. String functionName,
  2. Map<String, dynamic> data,
  3. dynamic onStreamReceived(
    1. String
    ), {
  4. dynamic onStreamDone()?,
  5. Function? onStreamError,
  6. bool sendLastLine = false,
})

Implementation

Future<StreamSubscription<String>> streamFirebaseFuncionLineByLine(
  String functionName,
  Map<String, dynamic> data,
  Function(String) onStreamReceived, {
  Function()? onStreamDone,
  Function? onStreamError,
  bool sendLastLine = false,
}) async {
  final Uri firebaseFunctionUrl = AppConfigBase.firebaseFunctionUri(functionName);

  // Get the current user's ID token
  String? token;
  try {
    token = await FirebaseAuth.instanceFor(app: AppConfigBase.firebaseApp)
        .currentUser
        ?.getIdToken();
  } catch (e) {
    loge('Error getting ID token: $e');
    onStreamError?.call(e);
    return Stream<String>.empty().listen((_) {});
  }

  if (token == null) {
    logd('User not authenticated.');
    onStreamError?.call('User not authenticated.');
    return Stream<String>.empty().listen((_) {});
  }

  final request = http.Request('POST', firebaseFunctionUrl)
    ..headers['Content-Type'] = 'application/json'
    ..headers['Authorization'] = 'Bearer $token'
    ..body = jsonEncode(data);

  final client = http.Client();
  http.StreamedResponse streamedResponse;
  try {
    streamedResponse = await client.send(request);
  } catch (e) {
    client.close();
    onStreamError?.call(e);
    return Stream<String>.empty().listen((_) {});
  }

  var cancelled = false;
  final controller = StreamController<String>(
    onCancel: () {
      cancelled = true;
      client.close();
    },
  );

  // Custom line-by-line handling with optional sending of last line
  String buffer = '';
  streamedResponse.stream.transform(utf8.decoder).listen(
    (chunk) {
      buffer += chunk;
      int index;
      while ((index = buffer.indexOf('\n')) != -1) {
        String line = buffer.substring(0, index);
        controller.add(line);
        buffer = buffer.substring(index + 1);
      }
    },
    onDone: () {
      try {
        if (!cancelled) {
          if (sendLastLine && buffer.isNotEmpty) {
            controller.add(buffer);
          }
          onStreamDone?.call();
        }
      } finally {
        client.close();
        controller.close();
      }
    },
    onError: (error) {
      try {
        if (!cancelled) onStreamError?.call(error);
      } finally {
        client.close();
        controller.close();
      }
    },
  );

  return controller.stream.listen(onStreamReceived);
}