handleRpcCall<O, R> method

Future<ResponsePacket?> handleRpcCall<O, R>(
  1. MessagePacket packet,
  2. TransportAdapter<dynamic, TransportOptions> adapter
)

Handles an incoming gRPC call by setting up the Serinus context and invoking the appropriate service method.

Implementation

Future<ResponsePacket?> handleRpcCall<O, R>(
  MessagePacket packet,
  TransportAdapter adapter,
) async {
  final routeContext = resolvedMessageRoutes[packet.id];
  if (routeContext == null) {
    throw GrpcError.notFound(
      'No message route found for service "${packet.id}"',
    );
  }
  try {
    final executionContext = ExecutionContext(
      HostType.rpc,
      routeContext.providers,
      routeContext.values,
      routeContext.hooks.services,
      RpcArgumentsHost(packet),
    );
    for (final hook in routeContext.hooks.reqHooks) {
      await hook.onRequest(executionContext);
    }
    if (routeContext.metadata.isNotEmpty) {
      executionContext.metadata.addAll(
        await routeContext.initMetadata(executionContext),
      );
    }
    for (final pipe in routeContext.pipes) {
      await pipe.transform(executionContext);
    }
    for (final beforeHook in routeContext.hooks.beforeHooks) {
      await beforeHook.beforeHandle(executionContext);
    }

    final payload = packet.payload as GrpcInvocationPayload<O, R>;
    final rpcContext = executionContext.switchToRpc();
    grpcContexts[payload.call] = rpcContext;

    // Invoke the service method via the gRPC invoker.
    // The invoker calls the actual service method, which can access
    // providers and other context via `call.context`.
    final resultStream = payload.invoker(
      payload.call,
      payload.method,
      payload.requests,
    );

    return ResponsePacket(
      pattern: packet.pattern,
      id: packet.id,
      payload: resultStream,
    );
  } on RpcException catch (e) {
    for (final filter in routeContext.exceptionFilters) {
      if (filter.catchTargets.contains(e.runtimeType) ||
          filter.catchTargets.isEmpty) {
        final executionContext = ExecutionContext(
          HostType.rpc,
          routeContext.providers,
          routeContext.values,
          routeContext.hooks.services,
          RpcArgumentsHost(packet),
        );
        await filter.onException(executionContext, e);
        break;
      }
      return ResponsePacket(
        pattern: packet.pattern,
        id: packet.id,
        isError: true,
        payload: {'error': e.message},
      );
    }
  }
  return null;
}