Messaging

MIT License stars pub version Discord Badge

A dart, flexible and powerful package to connect your components and services in a loosely coupled manner.

Allows you to maximize scalability and responsiveness using mainly the publisher/subscriber pattern.


Installation

This package is a full dart package so it can be used in every platforms supported by dart. To install it, use

  • dart
dart pub add messaging
  • flutter
flutter pub add messaging

Features

Messaging has many features from the basics:

  • Publish: allowing to publish in a loosely coupled manner a Message to their subscribers
  • Subscribe/Unsubscribe: allowing a subscriber to subscribe/unsubscribe to one or many messages.

To more specifics:

  • Priority queue: allowing you to give priority to message.
  • Store: allowing to store message state where you want (in memory or in file or in database).
  • Publish Now: allowing to publish to all subscribers and await the end of the publishing.
  • Guards: allowing to add some guard/filter to your messaging instance to filter messages by time/app state etc.
  • Observers: allowing to observe a message state or messaging instance state.
  • Flexibility: allowing you to customize the interfaces used the package to adapt to your business logic.

Usage

Messaging instantiation

The instantiation could be simple or more complex.

// Simply
final Messaging messaging = Messaging();

// OR 

// Complex/Flexible
final ILogger myLogger = MyLogger();
final MessagingStore store = MyStore();
final Iterable<MessagingGuard> guards = <MessagingGuard>[];
final  Iterable<MessagingObserver> observers = <MessagingObserver>[];
final MessagingQueueFactory messagingQueueFactory = (dispatcher) => MyMessagingQueue(dispatcher: dispatcher, resumeStrategy:
        ResumeQueueStrategy.dispatchPendingMessages);

final Messaging messaging = Messaging(
    logger: myLogger,
    store: store,
    guards: guards,
    observers: observers,
    messagingQueueFactory: messagingQueueFactory,
);

It allows us to publish, subscribe to message, has its own lifecycle and must be started before any message are dispatched.

Lifecycle

  • started means the messaging has been started. Before it is started, you can add subscribers (you can even add after) and publish message but they won't be dispatched to subscribers and will remain in the queue. To start the messaging use
await messaging.start();

If you use it in a flutter application we recommend to start it before the runApp function.

  • stopped means the messaging has been stopped and the queue has been reset.
messaging.stop();

Only the queue is reset, guards, observers are not changed or cleared.

  • paused means the messaging has been paused so every published message won't be dispatched and will remain in the queue.
  • resumed means the messaging has been resumed after a pause or start. All pending messages in the queue will be dispatch following the resumeStrategy of MessagingQueue.

Message implementation

A message is an immutable data structure that you send/publish. It should be an object with its class extending Message base class.

class UserCreateMessage extends Message {
    final User user;

    const UserCreateMessage(this.user): super(priority: 10);   
}

A message can be published multiple times and has a priority. Higher is the priority, more quickly it will be dispatched to subscribers.

Message state

A message has a state where informations about it are saved in the store. The state of a message is accessible through its MessageState class with states:

  • published the message has been published aka it has passed the guards and has been added to the queue.
  • dispatching the message is currently dispatching and the subscriber key that already received the message is saved in the MessageState
  • dispatched the message has been dispatched to all current subscribers.

Message subscription

To subscribe to a message you must have a subscriber or a class that implemented MessagingSubscriber. This class can be service, a component, a widget etc. A subscriber can

  • be implemented like
class UserCreateSubscriber implements MessagingSubscriber {
    @override
    Future<void> onMessage(Message message) {
        if (message is UserCreateMessage) {
            // Do related stuff
        }
    }

    @override
    String get subscriberKey => '$UserCreateSubscriber';
}
  • subscribe like
final UserCreateSubscriber subscriber = UserCreateSubscriber();
messaging.subscribe(subscriber, to: UserCreateMessage);

// OR
messaging.subscribeAll(subscriber, to: <Type>[UserCreateMessage]);
  • unsubscribe like
messaging.unsubscribe(subscriber, to: UserCreateMessage);

// OR
messaging.unsubscribeAll(subscriber); // It will unsubscribe to all subscribed messages

IMPORTANT: A each subscriber should return a unique subscriber key through the getter subscriberKey. Prefer to add all subscribers before starting the messaging so that pending messages in the store could be dispatched to them.

Message publication

You can publish a message from anywhere in your code where you have access to the Messaging instance. A message can be published:

  • in queue
final User user = User(name: 'John Doe');
final UserCreateMessage message = UserCreateMessage(user);
final PublishResult result = messaging.publish(message);
  • immediately
final User user = User(name: 'John Doe');
final UserCreateMessage message = UserCreateMessage(user);
final PublishResult result = await messaging.publishNow(message);

Similarities between in queue and immediately

  • They check that the message is allowed by the guards.
  • They inform the observers.
  • They save the message in the store.
  • They return a PublishResult.

Differences between in queue and immediately

  • in queue is synchronous and immediately is asynchronous.
  • in queue added the message in queue and immediately directly dispatches the message to subscribers so doesn't add it in the queue.

Result of publication

The methods publish and publishNow returns a PublishResult that allows to know if the publication succeed or not. If the publication was not allowed by the guards, the result will be a GuardPublishResult and if it failed for another reason (for example a subscriber throws an error and you pass strategy of publishNow to PublishNowErrorHandlingStrategy.breakDispatch), it will be a FailedPublishResult.

Observe

It is possible to observe many changes in the package through an observer that will be inform for specific operations that occurred. It is possible to create an observer like

class MyObserver extends MessagingObserver {
  @override
  void onPrePublish(Message message) {
    // Informed before message is published
    super.onPrePublish(message);
  }

  @override
  void onDispatchFailed(Message message, Object error, {MessagingSubscriber? subscriber, StackTrace? trace}) {
    // Informed when an error occurred during dispatching (to an subscriber or not) 
    super.onDispatchFailed(message, error, subscriber, trace);
  }

  @override
  void onMessagingStateChanged(MessagingState state) {
    // Informed when the state of the messaging instance changed
    super.onMessagingStateChanged(state);
  }

  @override
  void onNotAllowed(Message message, MessagingGuard guard, MessagingGuardResponse response) {
    // Informed when a message is not allowed by a guard
    super.onNotAllowed(message, guard, response);
  }

  @override
  void onPostDispatch(Message message) {
    // Informed after the message is dispatched
    super.onPostDispatch(message);
  }

  @override
  void onPostPublish(Message message) {
    // Informed after the message is published
    super.onPostPublish(message);
  }

  @override
  void onPreDispatch(Message message) {
    // Informed before the message is dispatched
    super.onPreDispatch(message);
  }

  @override
  void onPublishFailed(Message message, Object error, {StackTrace? trace}) {
    // Informed when publication of a message failed for any other reason but the guard
    super.onPublishFailed(message, error, trace);
  }

  @override
  void onSaved(Message message) {
    // Informed when the message is saved in the store
    super.onSaved(message);
  }
}

Then you can add your observer in two ways:

  • In instantiation of your Messaging.
  • Adding to the observers property like messaging.observers.add(MyObserver()).

Guard

It is also possible to add a guard to allow you to filter message that should not be publish. It could be implemented like

class OnceMessageGuard implements MessagingGuard, MessagingObserver {
  static const List<Type> messageTypePublishableOnce = [
    AppLaunchMessage,
    RefreshTokenMessage
  ];
  final List<Type> _messageTypeAlreadyPublishedOnce = [];

  @override
  MessagingGuardResponse can(Message message, Messaging messaging) {
    if (messageTypePublishableOnce.contains(message.runtimeType)) {
      if (_messageTypeAlreadyPublishedOnce.contains(message.runtimeType)) {
        return const NotAllowedMessagingGuardResponse();
      } else {
        _messageTypeAlreadyPublishedOnce.add(message.runtimeType);
      }
    }

    return const AllowedMessagingGuardResponse();
  }

  @override
  void onPublishFailed(Message message, Object error, {StackTrace? trace}) {
    _checkAndRemove(message);
  }

  @override
  void onNotAllowed(Message message, Object error, {StackTrace? trace}) {
    _checkAndRemove(message);
  }

  void _checkAndRemove(Message message) {
    if (messageTypePublishableOnce.contains(message.runtimeType)) {
      _messageTypeAlreadyPublishedOnce.remove(message.runtimeType);
    }
  }
}

Yes your guard can also be an observer. Just be sure to use the same instance ^^. Then you can add your observer in two ways:

  • In instantiation of your Messaging.
  • Adding to the observers property like messaging.guards.add(OnceMessageGuard()).

Customization

Queue customization

All published messages are added to queue which is a MessagingQueue using their generated unique key and this one is responsable to dispatch a message to subscribers through the messaging api. The implementation (so the behavior) of the queue can be different on your needs. You can create your own implementation by extending/implementing the MessagingQueue class and give your implementation through the MessagingQueueFactory parameter of Messaging or use the existing ones:

  • TimerMessagingQueue that uses an internal Timer to dispatch message at interval of time.
  • SyncMessagingQueue that dispatches messages directly when they are published to the queue.

IMPORTANT: If you extend MessagingQueue to dispatch a message you just have to call dispatchQueuedItem() method. The method onItemAddedToQueue is called every time a new item/message is added/published to the queue.

Store customization

Before being published and after being allowed by the guards, the message is saved in the store. This store is also used to get message by their generated key before dispatching it. You can implement your own store by extending/implementing MessagingStore or you can use the implemented MessagingMemoryStore that saved messages in memory.

Logger customization

The logger is only used to log operation made. The default implementation use logger package internally and can be configured through the logConfig parameter or you can use your own implementation.

Additional information

There's many others functionalities that'll come in the next version so i am open to contributions or simply discussion about the current implementation.

TODOs

  • Add group/channel of message and allow to subscribe to it
  • Integrate
    • Idempotent messages
    • Poison messages
    • Repeated messages
    • Message expiration
    • Message scheduling

Libraries

messaging
Messaging package