mongo_realtime 3.0.4
mongo_realtime: ^3.0.4 copied to clipboard
A Dart package that allows you to listen in real-time to changes in a MongoDB database
MongoRealtime #
A pure Dart package that allows you to listen in real-time to changes in a MongoDB database via a WebSocket connection to a bridge server using the Node.js package mongo-realtime.

Features #
- Listen to insert, update, delete events on MongoDB collections in real-time.
- Filter and sort documents with a fluent query builder API.
- Support custom document identifier keys through
idField. - Type-safe document mapping with generic types.
- Stream changes as raw documents or typed values.
- Optimistic updates with offline support.
- Lightweight, pure Dart - no Flutter dependencies.
- Automatic connection management and reconnection.
- Public connection lifecycle controls and callbacks.
Getting Started #
1. Install the Dart Package #
Add mongo_realtime to your pubspec.yaml or via:
dart pub add mongo_realtime
2. Set Up the Node.js Bridge (optional) #
This package relies on a Node.js bridge using mongo-realtime.
Install it (on the server):
npm install mongo-realtime
Create a simple Node.js server:
const { MongoRealTimeServer } = require("mongo-realtime");
const server = new MongoRealTimeServer({
mongoUri: "mongodb://localhost:27017/mydb",
dbName: "mydb",
});
await server.start();
Make sure your MongoDB instance is a replica set.
Important: If your server is not using the mongo-realtime package, it should emit db events itself through a socket.
3. Use in Dart #
Basic usage
import 'package:mongo_realtime/mongo_realtime.dart';
void main() async {
// Connect to the server
// Use `connect` before accessing the singleton instance `MongoRealtime.instance` or `realtime`
MongoRealtime.connect('ws://localhost:3000');
realtime.collection('users').stream.listen((documents) {
print('Users: ${documents.map((doc) => doc.id).toList()}');
});
// Or create a new instance:
final client = MongoRealtime('ws://localhost:3000');
// Stream all documents from a collection
client.collection('users').stream.listen((documents) {
print('Users: ${documents.map((doc) => doc.id).toList()}');
});
// Listen to database changes
client.watch.onInsert((change) {
print('Inserted: ${change.docId} in ${change.collection}');
});
// Using a different url schema (e.g http instead of ws) also works, it will be converted to ws internally
final client2 = MongoRealtime('localhost:3000'); // This will connect to ws://localhost:3000
final client3 = MongoRealtime('https://my-api.com'); // This will connect to wss://my-api.com
}
MongoRealtime.instance and the global realtime getter are only available
after calling MongoRealtime.connect(...). If you want to use the package
without a singleton, create a standalone client with MongoRealtime(url).
Using auth data
If your server is setup with authentication, you may need to provide authentication data (e.g. a JWT token) when connecting. See MongoRealtime Authentication for more details on the server side setup.
MongoRealtime.connect(
'my_server:3000',
authData: 'my_jwt_token',
);
// or a Map for more complex auth data
MongoRealtime.connect(
'my_server:3000',
authData: {
'token': 'my_jwt_token',
'userId': '123',
},
);
Connection lifecycle
final client = MongoRealtime(
'ws://localhost:3000',
autoReconnectConfig: const RealtimeReconnectConfig(
delay: Duration(seconds: 2),
maxAttempts: 5,
),
onConnected: (client) {
print('connected: ${client.url}');
},
onDisconnected: (error) {
print('disconnected: $error');
},
);
await client.reconnect();
print(client.isConnected);
await client.disconnect();
print(client.isConnected);
Query with filters and sorting
// Find documents with query
final users = await realtime.collection('users')
.where('age', isGreaterThan: 18)
.where('role', isEqualTo: 'admin')
.sort('createdAt', descending: true)
.limit(10)
.find();
// Stream with real-time updates
realtime.collection('users')
.where('status', isEqualTo: 'active')
.stream
.listen((documents) {
print('Active users: $documents');
});
// Stream typed values directly
realtime.collection<User>(
'users',
fromJson: (json) => User.fromJson(json), // required or else streamWithValue will throw a state error
)
.where('status', isEqualTo: 'active')
.streamWithValue
.listen((users) {
print('Active user names: ${users.map((user) => user.name).toList()}');
});
// `or` queries
// All clauses inside `or` builder are combined with OR, and the result is combined with the rest of the query with AND
realtime.collection('users')
.where('age', isGreaterThan: 18)
.or((q) {
// This will match users who are adults admin OR adults with active status
q.where('role', isEqualTo: 'admin');
q.where('status', isEqualTo: 'active');
})
.stream.listen((documents) {
print('Users who are either adults or active admins: $documents');
});
Type-safe document mapping
class User {
final String id;
final String name;
final int age;
User.fromJson(Map<String, dynamic> json)
: id = json['_id'],
name = json['name'],
age = json['age'];
}
// Map documents to User objects
final users = await realtime.collection<User>('users',
fromJson: (json) => User.fromJson(json),
).find();
users.forEach((doc) {
print('${doc.value?.name} (${doc.value?.age})');
});
// Or listen to typed values directly
realtime.collection<User>(
'users',
fromJson: (json) => User.fromJson(json),
).streamWithValue.listen((users) {
print(users.map((user) => user.name).toList());
});
Using a custom document id field
By default, document references use MongoDB's _id field. If your collection
uses another unique field such as email, slug, or uuid, provide idField
when creating the collection reference.
final users = realtime.collection<User>(
'users',
idField: 'email',
fromJson: (json) => User.fromJson(json),
);
final user = users.doc('alice@example.com');
final current = await user.find();
user.streamWithValue.listen((value) {
print('Updated user: ${value?.name}');
});
await user.update(
$set: {'name': 'Alice'},
optimistic: true,
);
user.watch.onUpdate((change) {
print('User changed: ${change.docId}');
});
// Override the collection idField for one specific document reference
final userBySlug = users.doc('alice-profile', idField: 'slug');
await userBySlug.delete();
Insert, update, delete
final collection = realtime.collection('users');
// Insert
await collection.insert({'name': 'John', 'age': 30});
// Update
await collection.update(
$set: {'age': 31},
filter: {'name': 'John'},
);
// or
await collection.where('name', isEqualTo: 'John').update($set: {'age': 31});
// Delete
await collection.delete(filter: {'name': 'John'});
// Delete using the current query filter
await collection.where('inactive', isEqualTo: true).delete();
Document-specific operations
final doc = realtime.collection('users').doc('user_123');
// Get a specific document
final user = await doc.find();
// Listen to changes on a specific document
doc.stream.listen((user) {
print('User updated: $user');
});
// Listen to the typed value of a specific document
realtime.collection<User>(
'users',
fromJson: (json) => User.fromJson(json),
).doc('user_123').streamWithValue.listen((user) {
print('Typed user updated: ${user?.name}');
});
// Update a specific document
await doc.update($set: {'age': 25});
// Delete a specific document
await doc.delete();
Database change watchers
// Listen to all changes
realtime.watch.onChange((change) {
print('Changed: ${change.docId} in ${change.collection}');
});
// Listen to specific collection
realtime.collection('users').watch.onInsert((change) {
print('New user: ${change.docId}');
});
// Listen to specific document
realtime.collection('users').doc('123').watch.onUpdate((change) {
print('User 123 updated');
});
Multiple server instances
final prodServer = MongoRealtime('ws://prod-server:3000');
final devServer = MongoRealtime('ws://dev-server:3000');
// Listen to changes on different servers
prodServer.collection('users').stream.listen(
(users) => print('Prod users: $users')
);
devServer.collection('users').stream.listen(
(users) => print('Dev users: $users')
);
Using optimistic updates
final collection = client.collection('users');
// Optimistic insert - update local streams immediately
await collection.insert(
{'_id': 'new_user', 'name': 'Jane'},
optimistic: true,
);
// Optimistic update - local cache is updated before server confirmation
await collection.where('name', isEqualTo: 'Jane').update(
$push: {'roles': 'admin'},
optimistic: true,
);
// Optimistic delete - matching documents disappear from local streams immediately
await collection.where('name', isEqualTo: 'Jane').delete(
optimistic: true,
);
// Default is `optimistic: false`, which waits for the server event before updating streams.
API Overview #
Main Classes #
-
MongoRealtime: Main client for connecting to the realtime server.MongoRealtime.connect(url, authData?, autoReconnectConfig?, onConnected?, onDisconnected?): Connect to a WebSocket server (singleton).MongoRealtime(url, authData?, autoReconnectConfig?, onConnected?, onDisconnected?): Create a new instance.instance: Access the singleton instance afterconnect().realtime: Global getter for the singleton instance afterconnect().isConnected: Current websocket state.disconnect(): Close the websocket without disposing the client.reconnect([url]): Reconnect and resubscribe active queries.autoReconnectConfig: Auto-reconnect behavior for unexpected disconnections.
-
RealtimeReconnectConfig:enabled: Enable or disable auto-reconnect.delay: Delay between reconnect attempts.maxAttempts: Maximum number of reconnect attempts.nullmeans unlimited.
Collection & Document Access #
-
collection(name, {fromJson, idField}): Get a collection reference.- Returns
RealtimeCollectionReference<T>. idFielddefaults to_id.
- Returns
-
RealtimeCollectionReference<T>:where(field, ...): Add filter clause (supportsisEqualTo,isNotEqualTo,isGreaterThan,isGreaterOrEqualTo,isLowerThan,isLowerOrEqualTo,arrayContains,isIn,matches).or(builder): Add OR clause.sort(field, descending?): Sort by field.limit(count): Limit results.stream: Stream of all matching documents.streamWithValue: Stream matching typed values.find(): Fetch all matching documents once.doc(id, {idField}): Get a document reference.insert(doc, optimistic?): Insert a document.update(..., filter?, optimistic?): Update documents.delete(filter?, optimistic?): Delete documents.
-
doc(id, {idField}): Get a document reference.- Returns
RealtimeDocumentReference<T>.
- Returns
-
RealtimeDocumentReference<T>:stream: Stream this document's changes.streamWithValue: Stream the typed value of this document.find(): Fetch this document once.update(..., optimistic?): Update this document.delete(optimistic?): Delete this document.watch: Watch insert/update/delete events for this document.
Query Builder #
RealtimeQueryBuilder<T>:where(field, ...): Add AND clause with field conditions.or(builder): Add OR clause with multiple conditions.sort(field, descending?): Add sort order.limit(count): Limit number of documents.stream: Get as a broadcast stream.streamWithValue: Get typed values as a broadcast stream.find(): Execute query once.update(..., optimistic?): Update all matching documents.delete(optimistic?): Delete all matching documents.
Database Watchers #
DbWatcher: Watch for database changes.onChange(handler): Listen to any change.onInsert(handler): Listen to inserts only.onUpdate(handler): Listen to updates only.onDelete(handler): Listen to deletes only.offChange(handler): Unsubscribe from changes.offInsert(handler): Unsubscribe from inserts.offUpdate(handler): Unsubscribe from updates.offDelete(handler): Unsubscribe from deletes.
Return Types #
-
RealtimeDocument<T>: A document with metadata.id: Document ID.data: Raw document map.value: Typed value (iffromJsonwas provided).
-
RealtimeDbChange: A database change event.type: Insert, update, delete, or change.collection: Collection name.docId: Document ID.fullDocument: Full document (if available).cast<T>(fromJson): Cast to typed model.tryCast<T>(fromJson): Try to cast to typed model.
License #
MIT License