getGoStream<TResponse extends GeneratedMessage> function

Stream<TResponse> getGoStream<TResponse extends GeneratedMessage>({
  1. required GeneratedMessage request,
  2. required void goFunc(
    1. int port,
    2. Pointer<UnsignedChar> buffer,
    3. int size
    ),
  3. required TResponse responseToFill,
  4. List<GeneratedMessage>? errorsToThrow,
})

Implementation

Stream<TResponse> getGoStream<TResponse extends GeneratedMessage>({
  required GeneratedMessage request,
  required void Function(int port, Pointer<UnsignedChar> buffer, int size)
      goFunc,
  required TResponse responseToFill,
  List<GeneratedMessage>? errorsToThrow,
}) {
  final sc = StreamController<TResponse>();
  final sink = sc.sink;
  final receivePort = ReceivePort();
  sc.sink.done.then(
    (value) {
      receivePort.close();
    },
  );

  receivePort.listen((data) {
    if (sc.isClosed) {
      receivePort.close();
      return;
    }
    final msg = _getResponseMessage(data);
    final error = _getError(errorsToThrow, msg);
    if (error != null) {
      sink.addError(error);
      return;
    }
    if (_isEmpty(msg)) {
      sink.close();
      return;
    }

    if (msg.canUnpackInto(responseToFill)) {
      msg.unpackInto(responseToFill);
      sink.add(responseToFill);
      return;
    }
    sink.addError('unsupported type of message');
  });

  _doCall(request, goFunc, receivePort);

  return sc.stream;
}