stream<I extends Object, O extends Object> method
Future<StreamResponse<I, O> >
stream<I extends Object, O extends Object>(
- Spec<
I, O> spec, - Stream<
I> input, [ - CallOptions? options
inherited
Call a streaming RPC - a method that takes zero or more input messages, and responds with zero or more output messages.
Implementation
@override
Future<StreamResponse<I, O>> stream<I extends Object, O extends Object>(
Spec<I, O> spec,
Stream<I> input, [
CallOptions? options,
]) async {
final signal = CancelableSignal(parent: options?.signal);
try {
final req = StreamRequest(
spec,
_baseUrl + spec.procedure,
_protocol.requestHeaders(
spec,
_codec,
sendCompression,
acceptCompressions,
options,
),
input,
signal,
);
late final StreamResponse<I, O> res;
if (_interceptors.isEmpty) {
res = await _protocol.stream(
req,
_codec,
_httpClient,
sendCompression,
acceptCompressions,
);
} else {
final first = _interceptors.apply<I, O>(
(req) async {
return await _protocol.stream(
req as StreamRequest<I, O>,
_codec,
_httpClient,
sendCompression,
acceptCompressions,
);
},
);
res = await first(req) as StreamResponse<I, O>;
}
// We don't want to cancel the signal when this function returns
// as that will cancel streaming reads/writes on the response
// and requests respectively.
//
// Instead we cancel on first read error from response or when
// it completes.
return StreamResponse(
res.spec,
res.headers,
res.message.withHooks(
onError: signal.cancel,
onDone: signal.cancel,
),
res.trailers,
);
} catch (err) {
signal.cancel(err);
rethrow;
}
}