data_management
A production-ready, offline-first Flutter data layer that abstracts Firestore, Hive, SQLite, or any custom backend behind a single typed API.
Features
- Typed repository —
LocalDataRepository/RemoteDataRepositoryover anyDataSource - Dual-write — primary-first writes with optional eager or lazy backup mirroring
- Offline queue — failed remote writes are persisted and replayed automatically on reconnect
- Fallback streams — real-time listeners switch between remote and local sources based on connectivity, with debounce
@reference fields — embed sibling/sub-document writes or reads inline, resolved in one batch#countable fields — store collection paths; resolved to live integer counts on readDataFieldValuesentinels —serverTimestamp,increment,arrayUnion,arrayRemove,deleteDataFieldValueWriter— atomic set / update / delete of related documents inside a single batch writeDataFieldValueReader— deferred read-time resolution (get, count, filter) of related pathsDataFieldPath.documentId— backend-agnostic document-ID queries- Cascade delete — follows
@and#refs and deletes all referenced documents in batches - In-memory + persistent cache — TTL-aware LRU cache with optional storage adapter
- AES-256-CBC encryption — per-document encryption with platform-adaptive backend (IO / Web)
- Multicast streams — shared upstream subscriptions, zero duplicate listeners
DataOperationSemaphore— configurable concurrency limit for parallel ref resolution
Installation
dependencies:
data_management: ^2.4.9
Setup
Configure the global DM singleton once at app start, before constructing any repository.
void main() {
DM.i.configure(
connectivity: MyConnectivityDelegate(), // implements DataConnectivityDelegate
cache: MyHiveCacheDelegate(), // implements DataCacheDelegate
);
runApp(const MyApp());
}
Quick Start
1. Define your entity
class User extends Entity {
final String name;
final String email;
User({required super.id, required this.name, required this.email});
@override
Map<String, dynamic> get source => {'name': name, 'email': email};
factory User.fromMap(Map<String, dynamic> map) => User(
id: map[EntityKey.i.id] ?? '',
name: map['name'] ?? '',
email: map['email'] ?? '',
);
}
2. Define your source
class UserSource extends RemoteDataSource<User> {
UserSource()
: super(
path: 'users',
documentId: 'id',
delegate: FirestoreDelegate(),
);
@override
User build(dynamic source) => User.fromMap(source as Map<String, dynamic>);
}
3. Define your repository
class UserRepository extends RemoteDataRepository<User> {
UserRepository()
: super(
source: UserSource(),
backupMode: true, // mirror writes to local backup
lazyMode: true, // mirror in background
queueMode: true, // queue writes when offline
);
}
4. Use it
final repo = UserRepository();
// Write
await repo.create(User(id: 'u1', name: 'Alice', email: 'alice@example.com'));
// Read
final result = await repo.getById('u1');
print(result.data?.name); // Alice
// Listen
repo.listenById('u1').listen((response) {
print(response.data?.name);
});
Core Concepts
@ Reference Fields
A field whose key starts with @ embeds a reference to another document.
On write (createRefs: true / updateRefs: true) — the value is a DataFieldValueWriter that creates, updates, or deletes the referenced document in the same atomic batch as the parent.
On read (resolveRefs: true) — the system fetches the referenced document and inlines it as a nested map, removing the @ prefix from the key.
// Write: creates 'users/u1' + 'users/u1/profile' in one batch
await repo.createById('u1', {
'name': 'Alice',
'@profile': DataFieldValueWriter.set(
'users/u1/profile',
{'bio': 'Flutter developer', 'avatar': 'https://...'},
),
}, createRefs: true);
// Read: '@profile' → 'profile': { 'bio': '...', 'avatar': '...' }
final result = await repo.getById('u1', resolveRefs: true);
print(result.data!.filtered['profile']); // { bio: Flutter developer, ... }
List of references — an @ field can hold a List of writers; each is written atomically and resolved to a List<Map> on read.
await repo.createById('post1', {
'title': 'Hello',
'@variants': [
DataFieldValueWriter.set('products/p1/variants/v_red', {'color': 'Red'}),
DataFieldValueWriter.set('products/p1/variants/v_blue', {'color': 'Blue'}),
],
}, createRefs: true);
Map of references — an @ field can hold a Map<String, Writer>; resolved to Map<String, Map> on read.
await repo.createById('p1', {
'@byRegion': {
'us': DataFieldValueWriter.set('products/p1/regions/us', {'price': 599.0}),
'bd': DataFieldValueWriter.set('products/p1/regions/bd', {'price': 65000.0}),
},
}, createRefs: true);
# Countable Fields
A field whose key starts with # stores a collection path. On read (countable: true) the system replaces the field with the live document count of that collection.
// Write: stores the collection path
await repo.createById('u1', {
'name': 'Alice',
'#posts': 'users/u1/posts',
'#notifications': 'users/u1/notifications',
});
// Read: '#posts' → 'posts': 7, '#notifications' → 'notifications': 3
final result = await repo.getById('u1', countable: true);
print(result.data!.filtered['posts']); // 7
print(result.data!.filtered['notifications']); // 3
DataFieldValue — Write Sentinels
await repo.updateById('u1', {
'lastSeen': DataFieldValue.serverTimestamp(), // backend server time
'score': DataFieldValue.increment(10), // atomic increment
'tags': DataFieldValue.arrayUnion(['vip']),
'oldBadge': DataFieldValue.arrayRemove(['legacy']),
'tempToken': DataFieldValue.delete(), // remove the field
});
DataFieldValueWriter — Inline Batch Sub-Writes
Embed writes to any path inside a parent write map. All are committed atomically.
await repo.createById('order1', {
'total': 199.0,
'status': 'pending',
// set a sub-document
'@invoice': DataFieldValueWriter.set(
'orders/order1/invoice/default',
{'items': 3, 'vat': 19.9},
),
// update a sibling document
'@userStats': DataFieldValueWriter.update(
'users/u1/stats',
{'orderCount': DataFieldValue.increment(1)},
),
// delete a document in the same batch
'@cart': DataFieldValueWriter.delete('users/u1/cart'),
}, createRefs: true);
DataFieldValueReader — Deferred Read-Time Resolution
Store a reader as a field value so every subsequent read resolves it live.
await repo.updateById('u1', {
// fetch the doc at the path and inline it
'@settings': DataFieldValueReader.get('users/u1/settings'),
// count the collection and inline the integer
'#posts': DataFieldValueReader.count('users/u1/posts'),
// query the collection with filters and inline as array
'@recentPosts': DataFieldValueReader.filter(
'users/u1/posts',
DataFieldValueQueryOptions(
queries: [DataQuery('published', isEqualTo: true)],
sorts: [DataSorting('createdAt', descending: true)],
options: const DataFetchOptions.limit(5),
),
),
});
DataFieldPath.documentId — Filter by Document ID
await repo.getByQuery(
queries: [
DataQuery(DataFieldPath.documentId, whereIn: ['u1', 'u2', 'u3']),
],
);
DataQuery — All Filter Operators
DataQuery('field', isEqualTo: value)
DataQuery('field', isNotEqualTo: value)
DataQuery('field', isLessThan: value)
DataQuery('field', isLessThanOrEqualTo: value)
DataQuery('field', isGreaterThan: value)
DataQuery('field', isGreaterThanOrEqualTo: value)
DataQuery('field', arrayContains: value)
DataQuery('field', arrayContainsAny: [v1, v2])
DataQuery('field', whereIn: [v1, v2])
DataQuery('field', whereNotIn: [v1, v2])
DataQuery('field', isNull: true)
// Composite AND / OR
DataQuery.filter(DataFilter.and([
DataFilter('status', isEqualTo: 'active'),
DataFilter('age', isGreaterThan: 18),
]))
DataQuery.filter(DataFilter.or([
DataFilter('role', isEqualTo: 'admin'),
DataFilter('role', isEqualTo: 'moderator'),
]))
DataSelection — Cursor Pagination
DataSelection.startAt([value])
DataSelection.startAfter([value])
DataSelection.startAtDocument(snapshot)
DataSelection.startAfterDocument(snapshot)
DataSelection.endAt([value])
DataSelection.endBefore([value])
DataSelection.endAtDocument(snapshot)
DataSelection.endBeforeDocument(snapshot)
DataFetchOptions — Page Size
const DataFetchOptions.limit(20) // first 20
const DataFetchOptions.limit(20, true) // last 20
const DataFetchOptions.single() // first 1
DataFetchOptions(fetchingSize: 20, initialFetchSize: 5)
DataSorting
DataSorting('createdAt', descending: true)
DataSorting('name') // ascending default
DataFieldParams — Dynamic Path Replacement
Paths may contain {placeholder} segments resolved at call time.
// source path: 'orgs/{orgId}/teams/{teamId}/members'
// by name
repo.getById('u1', params: KeyParams({'orgId': 'org42', 'teamId': 't7'}));
// by position
repo.getById('u1', params: IterableParams(['org42', 't7']));
Write API
| Method | Description |
|---|---|
create(entity) |
Write entity using its id + filtered map |
createById(id, data) |
Write a document with explicit id + data map |
creates(entities) |
Batch-create multiple entities |
createByWriters(writers) |
Batch-create from explicit DataWriter list |
updateById(id, data) |
Partial update (preserves untouched fields) |
updateByWriters(writers) |
Partial update multiple documents |
deleteById(id) |
Delete one document (optionally cascade) |
deleteByIds(ids) |
Delete multiple documents |
clear() |
Delete all documents in the collection |
write(writers) |
Low-level heterogeneous atomic batch |
Cascade Delete
await repo.deleteById(
'u1',
deleteRefs: true, // follow @ fields
counter: true, // also delete # collection docs
ignore: (key, _) => key == '@avatar', // skip specific fields
);
Batch Write (low-level)
await repo.write([
DataSetWriter('col/doc1', {'field': 'value'}),
DataUpdateWriter('col/doc2', {'score': DataFieldValue.increment(1)}),
DataDeleteWriter('col/doc3'),
]);
Read API
| Method | Description |
|---|---|
checkById(id) |
Existence check + optional auto-sync to backup |
count() |
Total document count in collection |
get() |
All documents in collection |
getById(id) |
Single document by ID |
getByIds(ids) |
Multiple documents by ID list |
getByQuery(...) |
Filter / sort / paginate |
search(checker) |
Client-side or prefix-scan text search |
Hydration flags (all read methods)
| Flag | Effect |
|---|---|
resolveRefs: true |
Fetch and inline @-prefixed reference fields |
countable: true |
Replace #-prefixed fields with live integer counts |
onlyUpdates: true |
Return only changed documents (docChanges) |
ignore: (key, _) => ... |
Skip specific fields during hydration |
cacheMode: true |
Cache result in memory for subsequent calls |
backupMode: true |
Fall back to backup source when primary fails |
lazyMode: true |
Sync result to backup in the background |
Listen API
| Method | Description |
|---|---|
listen() |
Real-time stream of all documents |
listenById(id) |
Real-time stream of a single document |
listenByIds(ids) |
Real-time merged stream of multiple documents |
listenByQuery(...) |
Real-time filtered / sorted stream |
listenCount() |
Real-time document count stream |
All listen methods support the same hydration flags as read methods.
For RemoteDataRepository with a local backup, streams automatically fall back to the local source when connectivity is lost (300 ms debounce).
StreamBuilder<Response<User>>(
stream: repo.listenByQuery(
queries: [DataQuery('active', isEqualTo: true)],
sorts: [DataSorting('name')],
resolveRefs: true,
countable: true,
),
builder: (context, snapshot) {
final users = snapshot.data?.result ?? [];
return ListView(children: users.map((u) => Text(u.name)).toList());
},
);
Offline Queue
When queueMode: true (default for RemoteDataRepository), writes that fail with Status.networkError are persisted to DM.i.cache and replayed automatically when connectivity returns.
- Up to 5 retry attempts per operation before it is discarded
- Supersession logic: a pending
createorupdatefor the same document is removed when adeletefor the same ID is enqueued - Call
DM.i.drainAll()to force an immediate drain (e.g. after pull-to-refresh)
Local ↔ Remote Sync
final repo = RemoteDataRepository<User>(
source: FirestoreUserSource(),
backup: HiveUserSource(),
backupMode: true,
lazyMode: true,
restoreMode: true,
);
// Hydrate local from remote on first launch
await repo.restore();
restore() checks whether the local source is empty (and whether the restore has already run) before pulling from the backup, so it is safe to call on every app start.
Encryption
final encryptor = DataEncryptor(
key: DataEncryptor.generateKey(), // store this securely
passcode: 'my-passcode',
);
class SecureUserSource extends RemoteDataSource<User> {
SecureUserSource()
: super(
path: 'users',
documentId: 'id',
delegate: FirestoreDelegate(),
encryptor: encryptor, // every document is AES-256-CBC encrypted
);
@override
User build(dynamic source) => User.fromMap(source as Map<String, dynamic>);
}
Multicast Streams
Extend MulticastDataDelegate instead of DataDelegate to share a single upstream Firestore listener across multiple subscribers for the same path.
class FirestoreDelegate extends MulticastDataDelegate {
FirestoreDelegate() : super(
multicastListen: true,
multicastListenById: true,
multicastListenByQuery: true,
);
// ...
}
Repository Options
| Option | Type | Default | Description |
|---|---|---|---|
backupMode |
bool |
true |
Mirror writes/reads to the optional backup source |
lazyMode |
bool |
true |
Perform backup operations in the background |
queueMode |
bool |
true |
Queue failed writes for offline replay |
restoreMode |
bool |
true |
Enable the restore() method |
cacheMode |
bool |
false |
Cache read results in memory by default |
backupFlushInterval |
Duration |
30s |
How often the local→remote flush timer fires |
backupFlushSize |
int |
50 |
Flush immediately after this many queued ops |
Error Handling
// Silent — swallow all errors
final repo = RemoteDataRepository<User>(
source: UserSource(),
errorDelegate: ErrorDelegate.silent,
);
// Printing — debugPrint every error (default)
final repo = RemoteDataRepository<User>(
source: UserSource(),
errorDelegate: ErrorDelegate.printing,
);
// Custom
class MyErrorDelegate implements ErrorDelegate {
@override
void onError(DataOperationError error) {
Sentry.captureException(error.cause, stackTrace: error.stack);
}
}
Response
Every method returns Response<T> from flutter_entity.
final r = await repo.getById('u1');
r.isSuccessful // status == ok
r.isValid // isSuccessful && result is not empty
r.data // first item or null
r.result // Iterable<T>
r.error // error string or null
r.status // Status enum value
r.snapshot // raw backend snapshot (QuerySnapshot, DocumentSnapshot, etc.)
Status Codes
| Status | Meaning |
|---|---|
Status.ok |
Operation succeeded |
Status.notFound |
Document / collection is empty |
Status.invalidId |
Empty id was passed |
Status.invalid |
Empty data / writers list |
Status.networkError |
Connectivity failure |
Status.failure |
Unexpected exception |
Status.canceled |
Partial success or operation skipped |
Status.nullable |
Encryption produced null payload |
Status.undefined |
Backup source not configured |
License
MIT