rpc_dart_data 2.1.1
rpc_dart_data: ^2.1.1 copied to clipboard
Data service layer (CRUD + querying + offline sync) built on rpc_dart.
rpc_dart_data #
rpc_dart_data is the high-level data layer (CRUD + queries + change streams + offline sync) that sits on top of rpc_dart.
It gives you a transport-agnostic contract, ready-to-use storage adapters, and utilities for building offline-friendly backends and clients.
-## Feature highlights
- Unified
DataServicecontract with helpers for create/get/list/update/patch/delete/deleteCollection. - Bulk workflows via
bulkUpsert,bulkUpsertStream, andbulkDeleteto move large batches atomically. - Search & aggregations (
search,aggregate) delegated to the storage adapter, including backend pagination. - Streaming snapshots: export/import the full database as NDJSON, stream it through
payloadStream, or setincludePayloadString: falseto skip building large strings. - Change streams & offline sync:
watchChangeswith cursors, bidirectionalsyncChanges, andOfflineCommandQueuefor durable command queues. - SQLite adapter with SQLCipher support (auto-loads
libsqlcipherwhen a key is provided, usessqlite3mc.wasmon web) and aSqliteSetupHookso you can register custom pragmas before the database is exposed to your code. - Ready-made environments (
DataServiceFactory.inMemory) for tests, demos, and local prototyping. - Collection discovery: RPC
listCollections()queries the storage adapter for the current list of collection names so clients can introspect available datasets without enumerating all records.
This package now depends directly only on rpc_dart, sqlite3, and helpers such as licensify/args. The former drift and rpc_dart_transports dependencies were removed so the core data layer stays lean—plug in whatever transport or storage adapter fits your architecture.
Architecture in seven layers #
- Transport (WebSocket / HTTP/2 / isolates / TURN / in-memory) provided by
rpc_dartimplementations (you can plug in anyIRpcTransport; this package now relies on the transports shipped byrpc_dartinstead of bundlingrpc_dart_transportsdirectly). - Endpoint (
RpcCallerEndpoint/RpcResponderEndpoint). - Contract + codecs (
IDataServiceContractandRpcCodec<...>). - Low-level plumbing (
DataServiceCaller/DataServiceResponder). - Repository + storage adapter (
BaseDataRepository, change journal, and the concrete adapter: in-memory or SQLite). - Facade (
DataServiceClient,DataServiceServer,DataServiceFactory,InMemoryDataServiceEnvironment). - Offline utilities such as
OfflineCommandQueueand reconnection helpers.
Consumers usually interact only with layer 6 while everything below stays private.
Quick start #
import 'package:rpc_dart/rpc_dart.dart';
import 'package:rpc_dart_data/rpc_dart_data.dart';
Future<void> main() async {
final env = await DataServiceFactory.inMemory();
final client = env.client;
final ctx = RpcContext.withHeaders({'authorization': 'Bearer dev'});
final created = await client.create(
collection: 'notes',
payload: {'title': 'Hello', 'done': false},
context: ctx,
);
final sub = client
.watchChanges(collection: 'notes', context: ctx)
.listen((event) => print('change=${event.type} id=${event.id} v=${event.version}'));
await client.patch(
collection: 'notes',
id: created.id,
expectedVersion: created.version,
patch: const RecordPatch(set: {'done': true}),
context: ctx,
);
await Future<void>.delayed(const Duration(milliseconds: 50));
await sub.cancel();
await env.dispose();
}
See example/extended_demo.dart for a larger walkthrough.
Streaming export and import #
DataRepository.exportDatabase writes every snapshot as newline-delimited JSON (NDJSON).
Each line contains a header, collection, record, collectionEnd, or footer entry. You can either:
- Work with the whole payload as a string (default), or
- Set
ExportDatabaseRequest(includePayloadString: false)and consume thepayloadStreamto keep memory flat while sending it over the wire.
Imports accept the same format. When replaceExisting is true, missing collections are removed and re-created so the target repository matches the snapshot exactly. Legacy JSON (format version 1.0.0) is still accepted for backward compatibility, but it requires loading the entire blob in memory first.
ImportDatabaseRequest always validates the snapshot stream before mutating any collections, and the importer processes records in databaseImportBatchSize chunks, so very large dumps no longer spike RAM usage.
Example: piping exported data #
final export = await repository.exportDatabase(
const ExportDatabaseRequest(includePayloadString: false),
);
await for (final chunk in export.payloadStream!) {
sink.add(chunk); // write to file/socket/etc
}
Offline queue and sync commands #
final env = await DataServiceFactory.inMemory();
final client = env.client;
final ctx = RpcContext.withHeaders({'authorization': 'Bearer x'});
final queue = client.createOfflineQueue(sessionId: 'device-1');
final command = queue.buildCreateCommand(
const CreateRecordRequest(collection: 'tasks', payload: {'title': 'Draft'}),
);
final persistedJson = command.toJson();
final ackFuture = queue.enqueueCommand(
DataCommand.fromJson(persistedJson),
autoStart: false,
context: ctx,
);
await queue.start(context: ctx);
await queue.flushPending();
final ack = await ackFuture;
print('applied=${ack.applied} id=${ack.record?.id}');
Use enqueueCommand(..., resolveConflicts: false) if you prefer the queue to stop on conflicts instead of retrying automatically.
DataServiceClient now includes:
listAllRecords– paginates throughlist()until the entire collection is in memory.bulkUpsertStream– streams large batches directly to the repository.pushAndAwaitAck– sends a single sync command and waits for the response.createOfflineQueue– a convenience constructor forOfflineCommandQueue.close– shuts down the underlying RPC endpoint.
Aggregations #
final metrics = await client.aggregate(
collection: 'orders',
metrics: {
'countAll': 'count',
'sumPrice': 'sum:price',
'avgPrice': 'avg:price',
'minPrice': 'min:price',
'maxPrice': 'max:price',
},
context: ctx,
);
print(metrics.metrics);
The repository validates metric expressions and delegates supported work to the storage adapter.
Change streams and optimistic concurrency #
watchChangesaccepts an optionalcursor, so clients can resume after a reconnect.- SQLite keeps a persistent
s_change_journaltable, allowing cursors to survive restarts. updateandpatchrequire anexpectedVersion. A mismatch triggersRpcDataError.conflict(...)(or a transportRpcException).
Listing collections #
Call DataServiceClient.listCollections() to retrieve the current catalog of collection names without enumerating records.
final collections = await client.listCollections(context: ctx);
print('collections: ${collections.join(', ')}');
The RPC experience simply forwards to DataStorageAdapter.listCollections(), so the result reflects whatever tables your adapter created.
Extending the storage layer #
Implement DataStorageAdapter to plug in any backend (PostgreSQL, Elastic, Firestore, ...):
class PostgresAdapter implements DataStorageAdapter {
@override
Future<DataRecord?> readRecord(String collection, String id) async {
// Query your store and return DataRecord when a row exists.
}
@override
Future<ListRecordsResponse> queryCollection(ListRecordsRequest request) async {
// Apply filters + pagination server-side.
}
// Implement searchCollection, aggregateCollection, writeRecords, deleteCollection, etc.
}
Adapters participate in streaming export/import by overriding readCollectionChunks, and they can expose custom indices via CollectionIndexStorageAdapter if needed.
SQLite profile #
SqliteDataStorageAdapter ships as the default single-node implementation:
- Creates per-collection tables on demand plus indexes on
version,created_at, andupdated_at. - Delegates filtering, sorting, and pagination to SQL for predictable O(log N) plans.
- Uses batched UPSERT statements to keep
writeRecordsfast even when thousands of rows are updated. - SQLCipher:
- Pass a
SqlCipherKeyto enable encryption; the runtime will auto-loadlibsqlcipheron macOS/Linux (checksSQLITE3_LIB_DIR/SQLITE3_LIB_NAMEor common Homebrew paths) before opening the database and will fail fast if cipher pragmas are missing. - On the web, the adapter uses the
sqlite3mc.wasmbuild (SQLite3MultipleCiphers) by default; setwebSqliteWasmUrito a custom location if you host your own WASM. - You can still register extra PRAGMA via
SqliteSetupHookfor fine-tuning.
- Pass a
- Stores
watch()/sync()events in a durable journal, so cursor-based clients recover after restarts.
Testing #
Smoke tests are located in test/data_service_facade_test.dart. Run the whole suite with:
dart test --concurrency=1 -r compact
Examples #
example/quick_start.dartexample/offline_sync.dartexample/extended_demo.dart
Production checklist #
- Schedule regular backups using
exportDatabase; periodically restore them into a staging environment withimportDatabase. - Restrict RPC access through a reverse proxy with authentication and rate limiting.
- Monitor the SQLite database and change-journal size (e.g., via
sqlite3CLI or metrics collectors). - Keep an eye on the CHANGELOG for breaking changes and new capabilities.
License #
MIT