rpc_dart 2.0.0
rpc_dart: ^2.0.0 copied to clipboard
gRPC-inspired library built on pure Dart
RPC Dart
Pure Dart RPC library for type-safe communication between components
Core Concepts #
RPC Dart is built on the following key concepts:
- Contracts β define service APIs through interfaces and methods
- Responder β server side, handles incoming requests
- Caller β client side, sends requests
- Endpoint β connection point, manages transport
- Transport β data transmission layer (InMemory, Isolate, HTTP)
- Codec β message serialization/deserialization
Key Features #
- Full RPC pattern support β unary calls, server streams, client streams, bidirectional streams
- Zero-Copy InMemory transport β object transport without serialization in single process
- Type safety β all requests/responses are strictly typed
- Automatic tracing β trace ID generated automatically, passed through RpcContext
- No external dependencies β pure Dart only
- Built-in primitives β ready wrappers for String, Int, Double, Bool, List
- Easy testing β with InMemory transport and mocks
CORD #
RPC Dart offers CORD (Contract-Oriented Remote Domains) β an architectural approach for structuring business logic through isolated domains with type-safe RPC contracts.
π Learn more
Quick Start #
π Full usage examples
1. Define contract and models #
// Contract with constants
abstract interface class ICalculatorContract {
static const name = 'Calculator';
static const methodCalculate = 'calculate';
}
// Zero-copy models β regular classes
class Request {
final double a, b;
final String op;
Request(this.a, this.b, this.op);
}
class Response {
final double result;
Response(this.result);
}
2. Server (Responder) #
final class CalculatorResponder extends RpcResponderContract {
CalculatorResponder() : super(ICalculatorContract.name);
@override
void setup() {
// Zero-copy method β DON'T specify codecs
addUnaryMethod<Request, Response>(
methodName: ICalculatorContract.methodCalculate,
handler: calculate,
);
}
Future<Response> calculate(Request req, {RpcContext? context}) async {
final result = switch (req.op) {
'add' => req.a + req.b,
_ => 0.0,
};
return Response(result);
}
}
3. Client (Caller) #
final class CalculatorCaller extends RpcCallerContract {
CalculatorCaller(RpcCallerEndpoint endpoint)
: super(ICalculatorContract.name, endpoint);
Future<Response> calculate(Request request) {
return callUnary<Request, Response>(
methodName: ICalculatorContract.methodCalculate,
request: request,
);
}
}
4. Usage #
void main() async {
// Create inmemory transport
final (client, server) = RpcInMemoryTransport.pair();
// Setup endpoints
final responder = RpcResponderEndpoint(transport: server);
final caller = RpcCallerEndpoint(transport: client);
// Register service
responder.registerServiceContract(CalculatorResponder());
responder.start();
final calculator = CalculatorCaller(caller);
// Call RPC method
final result = await calculator.calculate(Request(10, 5, 'add'));
print('10 + 5 = ${result.result}'); // 10 + 5 = 15.0
// Cleanup
await caller.close();
await responder.close();
}
Transports #
InMemory Transport (included in main library) #
Perfect for development, testing, and monolith applications:
final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();
// Usage: development, unit tests, simple applications
π Zero-Copy
For maximum performance, RPC Dart supports zero-copy object transfer without serialization (only for RpcInMemoryTransport
):
// Zero-copy contract β just don't specify codecs!
addUnaryMethod<UserRequest, UserResponse>(
methodName: 'GetUser',
handler: (request, {context}) async {
return UserResponse(id: request.id, name: 'User ${request.id}');
},
// DON'T specify codecs = automatic zero-copy
);
Data Transfer Modes #
RPC Dart supports three data transfer modes in contracts:
Mode | Description | Usage |
---|---|---|
zeroCopy |
Force zero-copy, codecs ignored | Maximum performance |
codec |
Force serialization, codecs required | Universal compatibility |
auto |
Smart choice: no codecs β zero-copy, has codecs β serialization | Flexible development |
auto
mode (recommended) automatically determines optimal transfer method:
final class SmartService extends RpcResponderContract {
SmartService() : super('SmartService',
dataTransferMode: RpcDataTransferMode.auto); // β Smart mode
void setup() {
// Will automatically choose zero-copy (no codecs specified)
addUnaryMethod<FastRequest, FastResponse>(
'fastMethod',
handler: fastHandler,
);
// Will automatically choose serialization (codecs specified)
addUnaryMethod<JsonRequest, JsonResponse>(
'universalMethod',
handler: universalHandler,
requestCodec: jsonRequestCodec, // β Codecs specified
responseCodec: jsonResponseCodec,
);
}
}
Additional Transports #
You can use ready-made transports from rpc_dart_transports package or implement your own via IRpcTransport
interface.
Available transports:
- Isolate Transport β for CPU-intensive tasks and failure isolation
- HTTP Transport β for microservices and distributed systems
- WebSocket Transport β for real-time applications
Key advantage: domain code remains unchanged when switching transports!
Transport Router #
Transport Router β smart proxy for routing RPC calls between transports using priority-based rules.
Main Features #
- Service-based routing β directs requests to different services on different transports
- Conditional routing β complex routing logic with context access
- Rule priorities β precise control over condition checking order
- Automatic routing β uses headers from
RpcCallerEndpoint
Usage Example #
// Create transports for different services
final (userClient, userServer) = RpcInMemoryTransport.pair();
final (orderClient, orderServer) = RpcInMemoryTransport.pair();
final (paymentClient, paymentServer) = RpcInMemoryTransport.pair();
// Create router with rules
final router = RpcTransportRouterBuilder()
.routeCall(
calledServiceName: 'UserService',
toTransport: userClient,
priority: 100,
)
.routeCall(
calledServiceName: 'OrderService',
toTransport: orderClient,
priority: 100,
)
.routeWhen(
toTransport: paymentClient,
whenCondition: (service, method, context) =>
service == 'PaymentService' &&
context?.getHeader('x-payment-method') == 'premium',
priority: 150,
description: 'Premium payments to separate service',
)
.build();
// Use router as regular transport
final callerEndpoint = RpcCallerEndpoint(transport: router);
final userService = UserCaller(callerEndpoint);
final orderService = OrderCaller(callerEndpoint);
// Requests automatically routed to appropriate transports
final user = await userService.getUser(request); // β userClient
final order = await orderService.createOrder(data); // β orderClient
Use cases: Microservice architecture, A/B testing, load balancing, service isolation.
RPC Interaction Types #
Type | Description | Example Usage |
---|---|---|
Unary Call | Request β Response | CRUD operations, validation |
Server Stream | Request β Stream of responses | Live updates, progress |
Client Stream | Stream of requests β Response | Batch upload, aggregation |
Bidirectional Stream | Stream β Stream | Chats, real-time collaboration |
Built-in Primitives #
RPC Dart provides ready wrappers for primitive types:
// Built-in primitives with codecs
final name = RpcString('John'); // Strings
final age = RpcInt(25); // Integers
final height = RpcDouble(175.5); // Doubles
final isActive = RpcBool(true); // Booleans
final tags = RpcList<RpcString>([...]); // Lists
// Convenient extensions
final message = 'Hello'.rpc; // RpcString
final count = 42.rpc; // RpcInt
final price = 19.99.rpc; // RpcDouble
final enabled = true.rpc; // RpcBool
// Numeric primitives support arithmetic operators
final sum = RpcInt(10) + RpcInt(20); // RpcInt(30)
final product = RpcDouble(3.14) * RpcDouble(2.0); // RpcDouble(6.28)
// Access value through .value property
final greeting = RpcString('Hello ') + RpcString('World');
print(greeting.value); // "Hello World"
StreamDistributor #
StreamDistributor β powerful manager for server streams that turns regular StreamController
into message broker with advanced capabilities:
Main Features #
- Broadcast publishing β send messages to all connected clients
- Filtered publishing β send conditionally to specific clients
- Lifecycle management β automatic stream creation/deletion
- Automatic cleanup β remove inactive streams by timer
- Metrics and monitoring β track activity and performance
Usage Example #
// Create distributor for notifications
final distributor = StreamDistributor<NotificationEvent>(
config: StreamDistributorConfig(
enableAutoCleanup: true,
inactivityThreshold: Duration(minutes: 5),
),
);
// Create client streams for different users
final userStream1 = distributor.createClientStreamWithId('user_123');
final userStream2 = distributor.createClientStreamWithId('user_456');
// Listen to notifications
userStream1.listen((notification) {
print('User 123 received: ${notification.message}');
});
// Send to all clients
distributor.publish(NotificationEvent(
message: 'System notification for everyone',
priority: Priority.normal,
));
// Send only to clients with specific IDs
distributor.publishFiltered(
NotificationEvent(message: 'VIP notification'),
(client) => ['user_123', 'premium_user_789'].contains(client.clientId),
);
// Get metrics
final metrics = distributor.metrics;
print('Active clients: ${metrics.currentStreams}');
print('Messages sent: ${metrics.totalMessages}');
Use cases: Perfect for implementing real-time notifications, chats, live updates and other pub/sub scenarios in server streams.
Testing #
// Unit test with mock (use any mock library)
class MockUserService extends Mock implements UserCaller {}
test('should handle user not found', () async {
final mockUserService = MockUserService();
when(() => mockUserService.getUser(any()))
.thenThrow(RpcException(code: RpcStatus.NOT_FOUND));
final bloc = UserBloc(mockUserService);
expect(
() => bloc.add(LoadUserEvent(id: '123')),
emitsError(isA<UserNotFoundException>()),
);
});
// Integration test with InMemory transport
test('full integration test', () async {
final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();
final server = RpcResponderEndpoint(transport: serverTransport);
server.registerServiceContract(UserResponder(MockUserRepository()));
server.start();
final client = UserCaller(RpcCallerEndpoint(transport: clientTransport));
final response = await client.getUser(GetUserRequest(id: '123'));
expect(response.user.id, equals('123'));
});
FAQ #
Is it production ready?
We recommend thoroughly testing the library in your specific environment before production deployment.
How to test RPC code?
// Unit tests with mocks
class MockUserService extends Mock implements UserCaller {}
test('should handle user not found error', () async {
final mockService = MockUserService();
when(() => mockService.getUser(any()))
.thenThrow(RpcException(code: RpcStatus.NOT_FOUND));
final bloc = UserBloc(mockService);
expect(() => bloc.loadUser('123'), throwsA(isA<UserNotFoundException>()));
});
// Integration tests with InMemory transport
test('full integration test', () async {
final (client, server) = RpcInMemoryTransport.pair();
final endpoint = RpcResponderEndpoint(transport: server);
endpoint.registerServiceContract(TestService());
endpoint.start();
final caller = TestCaller(RpcCallerEndpoint(transport: client));
final result = await caller.getData();
expect(result.value, equals('expected'));
});
What performance to expect?
RPC Dart is optimized for real applications. In typical scenarios performance is more than sufficient:
Practical observations:
- InMemory transport has minimal overhead
- CBOR serialization is more efficient than JSON
- HTTP transport adds network latency
- Streaming is efficient for large data volumes
For most applications, development convenience and architectural clarity are more important than micro-optimizations.
How to handle errors?
RPC Dart uses gRPC statuses for unified error handling:
try {
final result = await userService.getUser(request);
} on RpcException catch (e) {
showError('RPC error: ${e.message}');
} on RpcDeadlineExceededException catch (e) {
showError('Request timeout: ${e.timeout}');
} on RpcCancelledException catch (e) {
showError('Operation cancelled: ${e.message}');
} catch (e) {
// Handle unexpected errors
logError('Unexpected error', error: e);
}
How to scale RPC architecture?
CORD scaling principles:
- Separate domains β each domain should have clear responsibility
- Use contracts β for type-safe communication
- Minimize coupling β domains communicate only through RPC
- Centralize logic β business logic in Responders
- Cache results β in Callers for UI optimization
// β Bad - direct dependencies
class OrderBloc {
final UserRepository userRepo;
final PaymentRepository paymentRepo;
final NotificationRepository notificationRepo;
}
// β
Good - through RPC contracts
class OrderBloc {
final UserCaller userService;
final PaymentCaller paymentService;
final NotificationCaller notificationService;
}
Benchmark
Useful Links:
Build scalable Flutter applications with RPC Dart!