callStreamingServerEndpoint<T, G> method

  1. @override
dynamic callStreamingServerEndpoint<T, G>(
  1. String endpoint,
  2. String method,
  3. Map<String, dynamic> args,
  4. Map<String, Stream> streams, {
  5. bool authenticated = true,
})
override

Calls a server endpoint method that supports streaming. The streams parameter is a map of stream names to stream objects. The method will listen to the streams and send the data to the server. Typically, this method is called by generated code.

T is the type of the return value of the endpoint stream. This is either a Stream or a Future.

G is the generic of T, such as T<G>.

If T is not a Stream or a Future, the method will throw an exception.

Implementation

@override
dynamic callStreamingServerEndpoint<T, G>(
  String endpoint,
  String method,
  Map<String, dynamic> args,
  Map<String, Stream> streams, {
  bool authenticated = true,
}) {
  var connectionDetails = MethodStreamConnectionDetails(
    endpoint: endpoint,
    method: method,
    args: args,
    parameterStreams: streams,
    outputController: StreamController<G>(),
    authKeyProvider: authenticated ? authKeyProvider : null,
  );

  _methodStreamManager.openMethodStream(connectionDetails).catchError((e, _) {
    Object error;
    if (e is OpenMethodStreamException) {
      error = switch (e.responseType) {
        OpenMethodStreamResponseType.endpointNotFound =>
          ServerpodClientNotFound(),
        OpenMethodStreamResponseType.authenticationFailed =>
          ServerpodClientUnauthorized(),
        OpenMethodStreamResponseType.authorizationDeclined =>
          ServerpodClientForbidden(),
        OpenMethodStreamResponseType.invalidArguments =>
          ServerpodClientBadRequest(),
        OpenMethodStreamResponseType.success => ServerpodClientException(
          'Unknown error, data: $e',
          -1,
        ),
      };
    } else {
      error = e;
    }

    connectionDetails.outputController.addError(error);
    connectionDetails.outputController.close();
  });

  if (T == Stream<G>) {
    return connectionDetails.outputController.stream;
  } else if ((T == Future<G>) && G == getType<void>()) {
    var result = Completer<void>();
    // Listen to stream so that close can be called when method has returned.
    connectionDetails.outputController.stream.listen(
      (e) {},
      onError: ((e, _) => result.completeError(e)),
      onDone: () => result.complete(),
      cancelOnError: true,
    );
    return result.future;
  } else if (T == Future<G>) {
    var result = Completer<G>();
    connectionDetails.outputController.stream.first.then(
      (e) {
        result.complete(e);
      },
      onError: (e, _) {
        result.completeError(e);
      },
    );
    return result.future;
  } else {
    throw UnsupportedError('Unsupported type $T');
  }
}