getGoStream<TResponse extends GeneratedMessage> function
Stream<TResponse>
getGoStream<TResponse extends GeneratedMessage>({
- required GeneratedMessage request,
- required void goFunc(
- int port,
- Pointer<
UnsignedChar> buffer, - int size
- required TResponse responseToFill,
- List<
GeneratedMessage> ? errorsToThrow,
Implementation
Stream<TResponse> getGoStream<TResponse extends GeneratedMessage>({
required GeneratedMessage request,
required void Function(int port, Pointer<UnsignedChar> buffer, int size)
goFunc,
required TResponse responseToFill,
List<GeneratedMessage>? errorsToThrow,
}) {
final sc = StreamController<TResponse>();
final sink = sc.sink;
final receivePort = ReceivePort();
sc.sink.done.then(
(value) {
receivePort.close();
},
);
receivePort.listen((data) {
if (sc.isClosed) {
receivePort.close();
return;
}
final msg = _getResponseMessage(data);
final error = _getError(errorsToThrow, msg);
if (error != null) {
sink.addError(error);
return;
}
if (_isEmpty(msg)) {
sink.close();
return;
}
if (msg.canUnpackInto(responseToFill)) {
msg.unpackInto(responseToFill);
sink.add(responseToFill);
return;
}
sink.addError('unsupported type of message');
});
_doCall(request, goFunc, receivePort);
return sc.stream;
}