stream<I extends Object, O extends Object> method

  1. @override
Future<StreamResponse<I, O>> stream<I extends Object, O extends Object>(
  1. Spec<I, O> spec,
  2. Stream<I> input, [
  3. CallOptions? options
])
inherited

Call a streaming RPC - a method that takes zero or more input messages, and responds with zero or more output messages.

Implementation

@override
Future<StreamResponse<I, O>> stream<I extends Object, O extends Object>(
  Spec<I, O> spec,
  Stream<I> input, [
  CallOptions? options,
]) async {
  final signal = CancelableSignal(parent: options?.signal);
  try {
    final req = StreamRequest(
      spec,
      _baseUrl + spec.procedure,
      _protocol.requestHeaders(
        spec,
        _codec,
        sendCompression,
        acceptCompressions,
        options,
      ),
      input,
      signal,
    );
    late final StreamResponse<I, O> res;
    if (_interceptors.isEmpty) {
      res = await _protocol.stream(
        req,
        _codec,
        _httpClient,
        sendCompression,
        acceptCompressions,
      );
    } else {
      final first = _interceptors.apply<I, O>(
        (req) async {
          return await _protocol.stream(
            req as StreamRequest<I, O>,
            _codec,
            _httpClient,
            sendCompression,
            acceptCompressions,
          );
        },
      );
      res = await first(req) as StreamResponse<I, O>;
    }
    // We don't want to cancel the signal when this function returns
    // as that will cancel streaming reads/writes on the response
    // and requests respectively.
    //
    // Instead we cancel on first read error from response or when
    // it completes.
    return StreamResponse(
      res.spec,
      res.headers,
      res.message.withHooks(
        onError: signal.cancel,
        onDone: signal.cancel,
      ),
      res.trailers,
    );
  } catch (err) {
    signal.cancel(err);
    rethrow;
  }
}