rpc_dart 3.0.1 copy "rpc_dart: ^3.0.1" to clipboard
rpc_dart: ^3.0.1 copied to clipboard

Transport-agnostic RPC framework with contracts, streaming and included transports.

RPC Dart

Pub Version CI Coverage Status DeepWiki

Transport-agnostic RPC framework for Dart. Implements the gRPC wire protocol — works over HTTP/2, WebSocket, Isolates, and in-memory without code changes.


Core concepts #

  • Contract — service name and method identifiers defining the API.
  • Responder — server side: registers methods and handles requests.
  • Caller — client side: invokes methods via an endpoint.
  • Endpoint — connection point that wraps a transport.
  • Transport — message transport (InMemory, Isolate, HTTP/2, WebSocket, etc.).
  • Codec — optional serializer/deserializer for requests and responses.

Key features #

  • Unary, server streaming, client streaming, bidirectional streaming.
  • Zero-copy in-process transport (RpcInMemoryTransport).
  • 3-layer transport architecture: IRpcChannelIRpcMultiplexedChannelIRpcTransport.
  • Resilience: retry, circuit breaker, rate limiter, reconnect state machine.
  • gRPC Health Checking Protocol (grpc.health.v1).
  • RpcBinaryCodec for protobuf and other binary formats.
  • RpcContext with trace id, headers, deadline, cancellation.
  • Pure Dart core — no external runtime dependencies.

Quick start #

Define a contract:

abstract interface class ICalculatorContract {
  static const name = 'Calculator';
  static const methodSum = 'sum';
}

class SumRequest {
  final List<double> values;
  SumRequest(this.values);
  factory SumRequest.fromJson(Map<String, dynamic> j) =>
      SumRequest((j['values'] as List).cast<double>());
  Map<String, dynamic> toJson() => {'values': values};
}

class SumResponse {
  final double result;
  SumResponse(this.result);
  factory SumResponse.fromJson(Map<String, dynamic> j) =>
      SumResponse(j['result'] as double);
  Map<String, dynamic> toJson() => {'result': result};
}

Implement the responder:

class CalculatorResponder extends RpcResponderContract {
  CalculatorResponder() : super(ICalculatorContract.name) {
    addUnaryMethod<SumRequest, SumResponse>(
      methodName: ICalculatorContract.methodSum,
      requestCodec: RpcCodec.withDecoder(SumRequest.fromJson),
      responseCodec: RpcCodec.withDecoder(SumResponse.fromJson),
      handler: (req, {context}) async {
        final total = req.values.fold<double>(0, (a, b) => a + b);
        return SumResponse(total);
      },
    );
  }
}

Run with in-memory transport:

void main() async {
  final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();

  final responder = RpcResponderEndpoint(transport: serverTransport);
  responder.registerServiceContract(CalculatorResponder());
  responder.start();

  final caller = RpcCallerEndpoint(transport: clientTransport);
  final res = await caller.callUnary<SumRequest, SumResponse>(
    serviceName: ICalculatorContract.name,
    methodName: ICalculatorContract.methodSum,
    requestCodec: RpcCodec.withDecoder(SumRequest.fromJson),
    responseCodec: RpcCodec.withDecoder(SumResponse.fromJson),
    request: SumRequest([1, 2, 3]),
  );
  print(res.result); // 6.0

  await caller.close();
  await responder.close();
}

Use rpc_dart_generator to generate caller/responder boilerplate from annotated Dart interfaces.


Protobuf / binary codecs #

RpcBinaryCodec works with any type — use it for protobuf or any custom binary format:

final reqCodec = RpcBinaryCodec<MyRequest>(
  toBytes: (r) => r.writeToBuffer(),
  fromBytes: MyRequest.fromBuffer,
);

Resilience #

Retry #

final caller = RpcCallerEndpoint(
  transport: transport,
  interceptors: [
    RpcRetryInterceptor(
      maxAttempts: 3,
      backoff: BackoffPolicy.exponential(
        initial: Duration(milliseconds: 100),
        multiplier: 2,
      ),
    ),
  ],
);

Circuit breaker #

RpcCircuitBreakerInterceptor(
  failureThreshold: 5,
  resetTimeout: Duration(seconds: 30),
)

Client connection with reconnect #

final connection = RpcClientConnection(
  transportFactory: () => RpcHttp2Transport(...),
  reconnectPolicy: BackoffPolicy.exponential(...),
);
await connection.connect();

Rate limiter #

final limiter = RpcRateLimiter(
  maxRequests: 100,
  window: Duration(seconds: 1),
  keyExtractor: (context) => context.header('user-id'),
);

gRPC Health Checking #

Implements the standard grpc.health.v1.Health protocol:

final health = RpcGrpcHealthService();
health.setStatus('MyService', ServingStatus.serving);
responder.registerServiceContract(health);

// Client:
final client = RpcGrpcHealthClient(caller);
final status = await client.check('MyService');

Transport architecture #

Version 3.0 introduces a 3-layer architecture:

IRpcChannel              — raw byte pipe (WebSocket, TCP, etc.)
IRpcMultiplexedChannel   — multiplexed framed messages
IRpcTransport            — full transport with stream IDs and health

Convenience factories on RpcChannelTransport:

// Zero-copy in-memory pair (tests, isolates):
final (t1, t2) = RpcChannelTransport.memoryPair();

// Frame-based pair (exercises full codec path):
final (t1, t2) = RpcChannelTransport.pair();

// Wrap any IRpcChannel (WebSocket, TCP):
final transport = RpcChannelTransport.fromChannel(myChannel);

Health monitoring #

final report = await caller.health();
if (!report.isHealthy) {
  print('issue: ${report.transportStatus?.message}');
  await caller.reconnect();
}

Cancellation and deadlines #

final token = RpcCancellationToken();
final ctx = RpcContext
    .withCancellation(token)
    .withTimeout(Duration(seconds: 2));

// In the handler, cooperate:
Future<Response> handle(Request req, {RpcContext? context}) async {
  for (final item in items) {
    context?.cancellationToken?.throwIfCancelled();
    await process(item);
  }
  return Response(...);
}

token.cancel('user cancelled');

Error handling #

Return specific gRPC status codes from handlers:

Future<Response> handle(Request req, {RpcContext? context}) async {
  if (!authorized) throw RpcStatusException(StatusCode.unauthenticated, 'Not authorized');
  return Response(...);
}

Testing #

test('sum', () async {
  final (ct, st) = RpcInMemoryTransport.pair();
  final responder = RpcResponderEndpoint(transport: st)
    ..registerServiceContract(CalculatorResponder())
    ..start();
  final caller = RpcCallerEndpoint(transport: ct);

  final res = await caller.callUnary(...);
  expect(res.result, 6.0);

  await caller.close();
  await responder.close();
});

Ecosystem #

This package is the core of the rpc_dart ecosystem. Additional packages are available:

Package Description
rpc_dart_generator Code generator for callers and responders
rpc_dart_framework Server framework: modules, DI, lifecycle, rate limiting
rpc_dart_grpc_reflection gRPC Server Reflection (grpcurl, Postman support)
rpc_dart_opentelemetry OpenTelemetry tracing and metrics
rpc_dart_http2 HTTP/2 transport (gRPC wire compatible)
rpc_dart_websocket WebSocket transport
rpc_dart_isolate Isolate transport

For the full list visit the GitHub repository.

5
likes
160
points
1.84k
downloads

Documentation

API reference

Publisher

verified publisherdart.nogipx.dev

Weekly Downloads

Transport-agnostic RPC framework with contracts, streaming and included transports.

Homepage
Repository (GitHub)
View/report issues

Topics

#rpc

License

MIT (license)

More

Packages that depend on rpc_dart