harvest 2.0.0
harvest: ^2.0.0 copied to clipboard
Event store for Dart with multiple backends
Harvest #
Harvest is a messagebus, CQRS frame work and event store for Dart with multiple backends. Features include
- Message bus
- Synchronous and asynchronous delivery
- Support for both "fire-and-forget" as well as delivery notication
- Optional Dart Stream/Sink interface
- Intercept dead messages (i.e. messages with no listeners)
- Listen to all published events regardless of type
- Enrich messages prior to their delivery
- Event store
- Create, persist and manipulate streams of events independently of the event storage type
- Simple API for supporting differnt storage backends for events
- Supports file, IndexedDB and memory backends
- Support for event sourced entities
- CQRS and SAGA support
MessageBus #
You can use the message bus standalone or in conjuction with the event store. Message delivery can be synchronous or asynchronous
// Synchronous messagebus
var syncMessageBus = new MessageBus()
// Asynchronous messagebus
var asyncmessageBus = new MessageBus.async()
the message bus support both "fire-and-forget" as well as delivery notications
// subcribe to MyMessage
messageBus.subscribe(MyMessage, (MyMessage msg) => print(msg));
// Publish message and continue immediately
messageBus.publish(myMessage);
// Publish message and await until it has been handled by all subscribers
int deliveredTo = await messageBus.publish(myMessage);
If you prefer the Dart Stream/Sink interface HArvest supports this as well
// Get a stream for MyMessage
var stream = messageBus.stream(MyMessage);
// Listen to stream
stream.listen((MyMessage myMessage) => print("recieved message $myMessage"));
// Get sink for MyMessage
var sink = messageBus.sink(MyMessage);
// Use sink to dispatch event
sink.add(new MyMessage("a message"));
Harvest exposes a hook for intercepting messages that has no listeners
// Handler invoked when a message with no subscribers is published
messageBus.deadMessageHandler = (Message msg) => print("no handler for ${msg.runtimeType}");
You can also listen to all events regardless of types
messageBus.everyMessage.listen((Message msg) => print("message ${msg.runtimeType} published");
If you want to intercept all messages prior to delivery you can use Harvest message enricher API
// Enricher that stores the current user id in the messages header
messageBus.enricher = (Message m) {
m.headers["userId"] = StaticSessionData.userId;
};
Event store #
Harvest event store can be used for simply storing events which can later be retrieved
import 'package:harvest/harvest.dart';
main() async {
var streamId = new Guid();
var eventStore = new MemoryEventStore();
// get a event stream for streamId
eventStream = await eventStore.openStream(streamId);
// create some events
var event1 = ...
var event2 = ...
// store them
eventStream.addAll([event1, event2]);
eventStream.commitChanges();
}
Various backends are included in the standard Harvest distribution
// Harvest FileEventStore that stores events in JSON files
import 'package:harvest/harvest_file.dart';
var eventStore = new FileEventStore("path/to/file.txt");
// Harvest IdbEventStore that stores events in IndexdDB databases
import 'package:harvest/harvest_idb.dart';
var eventStore = new IdbEventStore("idb_database");
new EventStores can be created by implementing the EventStore and EventStream APIs
Event sourcing #
Fully fletched event sourced applications are also supported. Event sourcing is the concept of saving and retriving objects by the events that occured on them rather than by their state. Consider the following bank account use case:
- User creates account
- User deposits 10$
- User withdraws 2$
In a CRUD application you would now have a BankAccount object with an amount property with the value 8. In a event sourced application you have a BankAccount object and 3 events for it
- AccountCreated
- AmountDeposited
- AmountWithdrawn
Where is this useful?
- For certain applications the eventlog can be useful in itself such as a audit trail in a financial system.
- It can help manage complexity in large applications by forcing programmers to make event types for every action that can occur.
- It makes debugging easy since you can replay the event log to recreate any former system state where an error occurred.
- It makes mobile app synchronization a breeze, since the offline app can just queue up events and replay them on the backend once it comes online.
- In applications using the CQRS architecture pattern.
For more information, see the provided example application.
Links #
TODO #
- Test and document usage of SAGAs (process manager)
- Ensure message headers are not serialized
- Enable CQRS event reloading test
- Create serilization interface that allows switching between mirror, transformation and manual serilizations