streamBidi method

BidiActionStream<S, O, I> streamBidi({
  1. Stream<I>? inputStream,
  2. StreamingCallback<S>? onChunk,
  3. Map<String, dynamic>? context,
  4. Init? init,
})

Implementation

BidiActionStream<S, O, I> streamBidi({
  Stream<I>? inputStream,
  StreamingCallback<S>? onChunk,
  Map<String, dynamic>? context,
  Init? init,
}) {
  StreamController<I>? internalInputController;
  if (inputStream == null) {
    internalInputController = StreamController<I>();
    inputStream = internalInputController.stream;
  }

  final streamController = StreamController<S>();
  final bidiStream = BidiActionStream<S, O, I>(
    streamController.stream,
    internalInputController?.sink,
  );

  run(
        null, // Pass null for unary input
        onChunk: (chunk) {
          if (!streamController.isClosed) {
            streamController.add(chunk);
          }
          if (onChunk != null) {
            onChunk(chunk);
          }
        },
        context: context,
        inputStream: inputStream,
        init: init,
      )
      .then((result) {
        bidiStream.setResult(result.result);
        if (!streamController.isClosed) {
          streamController.close();
        }
      })
      .catchError((e, s) {
        bidiStream.setError(e, s);
        if (!streamController.isClosed) {
          streamController.addError(e, s);
          streamController.close();
        }
      });

  return bidiStream;
}