mongo_realtime 3.0.4 copy "mongo_realtime: ^3.0.4" to clipboard
mongo_realtime: ^3.0.4 copied to clipboard

A Dart package that allows you to listen in real-time to changes in a MongoDB database

MongoRealtime #

A pure Dart package that allows you to listen in real-time to changes in a MongoDB database via a WebSocket connection to a bridge server using the Node.js package mongo-realtime.

Banner

Features #

  • Listen to insert, update, delete events on MongoDB collections in real-time.
  • Filter and sort documents with a fluent query builder API.
  • Support custom document identifier keys through idField.
  • Type-safe document mapping with generic types.
  • Stream changes as raw documents or typed values.
  • Optimistic updates with offline support.
  • Lightweight, pure Dart - no Flutter dependencies.
  • Automatic connection management and reconnection.
  • Public connection lifecycle controls and callbacks.

Getting Started #

1. Install the Dart Package #

Add mongo_realtime to your pubspec.yaml or via:

dart pub add mongo_realtime

2. Set Up the Node.js Bridge (optional) #

This package relies on a Node.js bridge using mongo-realtime.

Install it (on the server):

npm install mongo-realtime

Create a simple Node.js server:

const { MongoRealTimeServer } = require("mongo-realtime");

const server = new MongoRealTimeServer({
  mongoUri: "mongodb://localhost:27017/mydb",
  dbName: "mydb",
});

await server.start();

Make sure your MongoDB instance is a replica set.

Important: If your server is not using the mongo-realtime package, it should emit db events itself through a socket.

3. Use in Dart #

Basic usage

import 'package:mongo_realtime/mongo_realtime.dart';

void main() async {
  // Connect to the server
  // Use `connect` before accessing the singleton instance `MongoRealtime.instance` or `realtime`
  MongoRealtime.connect('ws://localhost:3000');
  realtime.collection('users').stream.listen((documents) {
    print('Users: ${documents.map((doc) => doc.id).toList()}');
  });

  // Or create a new instance:
  final client = MongoRealtime('ws://localhost:3000');

  // Stream all documents from a collection
  client.collection('users').stream.listen((documents) {
    print('Users: ${documents.map((doc) => doc.id).toList()}');
  });

  // Listen to database changes
  client.watch.onInsert((change) {
    print('Inserted: ${change.docId} in ${change.collection}');
  });

  // Using a different url schema (e.g http instead of ws) also works, it will be converted to ws internally
  final client2 = MongoRealtime('localhost:3000'); // This will connect to ws://localhost:3000
  final client3 = MongoRealtime('https://my-api.com'); // This will connect to wss://my-api.com
}

MongoRealtime.instance and the global realtime getter are only available after calling MongoRealtime.connect(...). If you want to use the package without a singleton, create a standalone client with MongoRealtime(url).

Using auth data

If your server is setup with authentication, you may need to provide authentication data (e.g. a JWT token) when connecting. See MongoRealtime Authentication for more details on the server side setup.

MongoRealtime.connect(
  'my_server:3000',
  authData: 'my_jwt_token',
);

// or a Map for more complex auth data
MongoRealtime.connect(
  'my_server:3000',
  authData: {
    'token': 'my_jwt_token',
    'userId': '123',
  },
);

Connection lifecycle

final client = MongoRealtime(
  'ws://localhost:3000',
  autoReconnectConfig: const RealtimeReconnectConfig(
    delay: Duration(seconds: 2),
    maxAttempts: 5,
  ),
  onConnected: (client) {
    print('connected: ${client.url}');
  },
  onDisconnected: (error) {
    print('disconnected: $error');
  },
);

await client.reconnect();
print(client.isConnected);

await client.disconnect();
print(client.isConnected);

Query with filters and sorting


// Find documents with query
final users = await realtime.collection('users')
  .where('age', isGreaterThan: 18)
  .where('role', isEqualTo: 'admin')
  .sort('createdAt', descending: true)
  .limit(10)
  .find();

// Stream with real-time updates
realtime.collection('users')
  .where('status', isEqualTo: 'active')
  .stream
  .listen((documents) {
    print('Active users: $documents');
  });

// Stream typed values directly
realtime.collection<User>(
  'users',
  fromJson: (json) => User.fromJson(json), // required or else streamWithValue will throw a state error
)
  .where('status', isEqualTo: 'active')
  .streamWithValue
  .listen((users) {
    print('Active user names: ${users.map((user) => user.name).toList()}');
  });

// `or` queries
// All clauses inside `or` builder are combined with OR, and the result is combined with the rest of the query with AND
realtime.collection('users')
  .where('age', isGreaterThan: 18)
  .or((q) {
    // This will match users who are adults admin OR adults with active status
    q.where('role', isEqualTo: 'admin');
    q.where('status', isEqualTo: 'active');
  })
  .stream.listen((documents) {
    print('Users who are either adults or active admins: $documents');
  });

Type-safe document mapping

class User {
  final String id;
  final String name;
  final int age;

  User.fromJson(Map<String, dynamic> json)
    : id = json['_id'],
      name = json['name'],
      age = json['age'];
}

// Map documents to User objects
final users = await realtime.collection<User>('users',
  fromJson: (json) => User.fromJson(json),
).find();

users.forEach((doc) {
  print('${doc.value?.name} (${doc.value?.age})');
});

// Or listen to typed values directly
realtime.collection<User>(
  'users',
  fromJson: (json) => User.fromJson(json),
).streamWithValue.listen((users) {
  print(users.map((user) => user.name).toList());
});

Using a custom document id field

By default, document references use MongoDB's _id field. If your collection uses another unique field such as email, slug, or uuid, provide idField when creating the collection reference.

final users = realtime.collection<User>(
  'users',
  idField: 'email',
  fromJson: (json) => User.fromJson(json),
);

final user = users.doc('alice@example.com');

final current = await user.find();

user.streamWithValue.listen((value) {
  print('Updated user: ${value?.name}');
});

await user.update(
  $set: {'name': 'Alice'},
  optimistic: true,
);

user.watch.onUpdate((change) {
  print('User changed: ${change.docId}');
});

// Override the collection idField for one specific document reference
final userBySlug = users.doc('alice-profile', idField: 'slug');
await userBySlug.delete();

Insert, update, delete

final collection = realtime.collection('users');

// Insert
await collection.insert({'name': 'John', 'age': 30});

// Update
await collection.update(
  $set: {'age': 31},
  filter: {'name': 'John'},
);
// or
await collection.where('name', isEqualTo: 'John').update($set: {'age': 31});

// Delete
await collection.delete(filter: {'name': 'John'});

// Delete using the current query filter
await collection.where('inactive', isEqualTo: true).delete();

Document-specific operations

final doc = realtime.collection('users').doc('user_123');

// Get a specific document
final user = await doc.find();

// Listen to changes on a specific document
doc.stream.listen((user) {
  print('User updated: $user');
});

// Listen to the typed value of a specific document
realtime.collection<User>(
  'users',
  fromJson: (json) => User.fromJson(json),
).doc('user_123').streamWithValue.listen((user) {
  print('Typed user updated: ${user?.name}');
});

// Update a specific document
await doc.update($set: {'age': 25});

// Delete a specific document
await doc.delete();

Database change watchers

// Listen to all changes
realtime.watch.onChange((change) {
  print('Changed: ${change.docId} in ${change.collection}');
});

// Listen to specific collection
realtime.collection('users').watch.onInsert((change) {
  print('New user: ${change.docId}');
});

// Listen to specific document
realtime.collection('users').doc('123').watch.onUpdate((change) {
  print('User 123 updated');
});

Multiple server instances

final prodServer = MongoRealtime('ws://prod-server:3000');
final devServer = MongoRealtime('ws://dev-server:3000');

// Listen to changes on different servers
prodServer.collection('users').stream.listen(
  (users) => print('Prod users: $users')
);

devServer.collection('users').stream.listen(
  (users) => print('Dev users: $users')
);

Using optimistic updates

final collection = client.collection('users');

// Optimistic insert - update local streams immediately
await collection.insert(
  {'_id': 'new_user', 'name': 'Jane'},
  optimistic: true,
);

// Optimistic update - local cache is updated before server confirmation
await collection.where('name', isEqualTo: 'Jane').update(
  $push: {'roles': 'admin'},
  optimistic: true,
);

// Optimistic delete - matching documents disappear from local streams immediately
await collection.where('name', isEqualTo: 'Jane').delete(
  optimistic: true,
);

// Default is `optimistic: false`, which waits for the server event before updating streams.

API Overview #

Main Classes #

  • MongoRealtime: Main client for connecting to the realtime server.

    • MongoRealtime.connect(url, authData?, autoReconnectConfig?, onConnected?, onDisconnected?): Connect to a WebSocket server (singleton).
    • MongoRealtime(url, authData?, autoReconnectConfig?, onConnected?, onDisconnected?): Create a new instance.
    • instance: Access the singleton instance after connect().
    • realtime: Global getter for the singleton instance after connect().
    • isConnected: Current websocket state.
    • disconnect(): Close the websocket without disposing the client.
    • reconnect([url]): Reconnect and resubscribe active queries.
    • autoReconnectConfig: Auto-reconnect behavior for unexpected disconnections.
  • RealtimeReconnectConfig:

    • enabled: Enable or disable auto-reconnect.
    • delay: Delay between reconnect attempts.
    • maxAttempts: Maximum number of reconnect attempts. null means unlimited.

Collection & Document Access #

  • collection(name, {fromJson, idField}): Get a collection reference.

    • Returns RealtimeCollectionReference<T>.
    • idField defaults to _id.
  • RealtimeCollectionReference<T>:

    • where(field, ...): Add filter clause (supports isEqualTo, isNotEqualTo, isGreaterThan, isGreaterOrEqualTo, isLowerThan, isLowerOrEqualTo, arrayContains, isIn, matches).
    • or(builder): Add OR clause.
    • sort(field, descending?): Sort by field.
    • limit(count): Limit results.
    • stream: Stream of all matching documents.
    • streamWithValue: Stream matching typed values.
    • find(): Fetch all matching documents once.
    • doc(id, {idField}): Get a document reference.
    • insert(doc, optimistic?): Insert a document.
    • update(..., filter?, optimistic?): Update documents.
    • delete(filter?, optimistic?): Delete documents.
  • doc(id, {idField}): Get a document reference.

    • Returns RealtimeDocumentReference<T>.
  • RealtimeDocumentReference<T>:

    • stream: Stream this document's changes.
    • streamWithValue: Stream the typed value of this document.
    • find(): Fetch this document once.
    • update(..., optimistic?): Update this document.
    • delete(optimistic?): Delete this document.
    • watch: Watch insert/update/delete events for this document.

Query Builder #

  • RealtimeQueryBuilder<T>:
    • where(field, ...): Add AND clause with field conditions.
    • or(builder): Add OR clause with multiple conditions.
    • sort(field, descending?): Add sort order.
    • limit(count): Limit number of documents.
    • stream: Get as a broadcast stream.
    • streamWithValue: Get typed values as a broadcast stream.
    • find(): Execute query once.
    • update(..., optimistic?): Update all matching documents.
    • delete(optimistic?): Delete all matching documents.

Database Watchers #

  • DbWatcher: Watch for database changes.
    • onChange(handler): Listen to any change.
    • onInsert(handler): Listen to inserts only.
    • onUpdate(handler): Listen to updates only.
    • onDelete(handler): Listen to deletes only.
    • offChange(handler): Unsubscribe from changes.
    • offInsert(handler): Unsubscribe from inserts.
    • offUpdate(handler): Unsubscribe from updates.
    • offDelete(handler): Unsubscribe from deletes.

Return Types #

  • RealtimeDocument<T>: A document with metadata.

    • id: Document ID.
    • data: Raw document map.
    • value: Typed value (if fromJson was provided).
  • RealtimeDbChange: A database change event.

    • type: Insert, update, delete, or change.
    • collection: Collection name.
    • docId: Document ID.
    • fullDocument: Full document (if available).
    • cast<T>(fromJson): Cast to typed model.
    • tryCast<T>(fromJson): Try to cast to typed model.

License #

MIT License

1
likes
160
points
567
downloads

Documentation

API reference

Publisher

verified publishermaxdev.tech

Weekly Downloads

A Dart package that allows you to listen in real-time to changes in a MongoDB database

Repository (GitHub)
View/report issues

License

MIT (license)

Dependencies

web_socket_channel

More

Packages that depend on mongo_realtime