eventador 2.1.0
eventador: ^2.1.0 copied to clipboard
Persistence & Event Sourcing Extension for Dactor - Industrial-grade event sourcing platform with hybrid architecture
Eventador #
Eventador is an industrial-grade persistence and event sourcing extension for the Dactor actor system. It provides durable state management, event sourcing capabilities, and CBOR-based serialization using Isar database as the storage backend.
๐ Features #
Core Event Sourcing #
- Persistent Actors: Actors that survive system restarts with automatic state recovery
- Event Sourcing: Complete command โ event โ state pipeline with audit trails
- CBOR Serialization: Efficient binary serialization for optimal storage performance
- Snapshot System: Configurable snapshots for fast recovery and performance optimization
- Aggregate Root Pattern: Domain-driven design patterns for complex business logic
Advanced Patterns #
- Saga Pattern: Long-running distributed transactions with compensation actions
- Event Projections: Real-time read models and materialized views
- Projection Actors: Pekko-style actor-based projection runtime (
ProjectionActor) with anAwaitEventAppliedprimitive for CQRS coordinators - Command/Event/State Abstractions: Type-safe event sourcing patterns
- Optimistic Concurrency Control: Version-based conflict resolution
Hybrid Architecture #
Eventador leverages a powerful hybrid architecture combining:
- Dactor: High-performance actor model with supervision and messaging
- Isar: Direct permanent event storage for immutable event logs
- DuraQ: Operational workflows for sagas, projections, and command processing
This separation ensures permanent audit trails in Isar while handling transient operational workflows through DuraQ.
๐ฆ Installation #
Add Eventador to your pubspec.yaml:
dependencies:
eventador: ^1.0.0
isar: ^3.1.0+1
cbor: ^6.0.0
# Local dependencies for hybrid architecture
dactor:
path: ../dactor # Actor model foundation
duraq:
path: ../duraq # Operational workflows
dev_dependencies:
isar_generator: ^3.1.0+1
build_runner: ^2.4.7
๐โโ๏ธ Quick Start #
1. Basic Persistent Actor #
import 'package:eventador/eventador.dart';
// Define your commands
class IncrementCommand extends Command {
final String aggregateId;
final int value;
IncrementCommand({required this.aggregateId, required this.value});
}
// Define your events
class CounterIncrementedEvent extends Event {
final String aggregateId;
final int value;
CounterIncrementedEvent({required this.aggregateId, required this.value});
}
// Create a persistent actor
class CounterActor extends PersistentActor {
int _value = 0;
CounterActor({
required String persistenceId,
required EventStore eventStore,
}) : super(persistenceId: persistenceId, eventStore: eventStore);
int get value => _value;
@override
Future<void> commandHandler(Command command) async {
if (command is IncrementCommand) {
final event = CounterIncrementedEvent(
aggregateId: persistenceId,
value: command.value,
);
await persistEvent(event);
}
}
@override
void eventHandler(Event event) {
if (event is CounterIncrementedEvent) {
_value += event.value;
}
}
@override
Future<dynamic> getSnapshotState() async => {'value': _value};
@override
Future<void> onSnapshot(dynamic snapshotState, int sequenceNumber) async {
if (snapshotState is Map<String, dynamic>) {
_value = snapshotState['value'] as int? ?? 0;
}
}
}
2. Initialize and Use #
Future<void> main() async {
// Initialize Isar for event storage
await Isar.initializeIsarCore(download: true);
final eventStore = await IsarEventStore.create(directory: './data');
// Create and start the actor
final counter = CounterActor(
persistenceId: 'counter-1',
eventStore: eventStore,
);
// Start the actor (triggers recovery)
counter.preStart();
await Future.delayed(Duration(milliseconds: 100));
// Send commands
await counter.onMessage(IncrementCommand(aggregateId: 'counter-1', value: 5));
await counter.onMessage(IncrementCommand(aggregateId: 'counter-1', value: 3));
print('Counter value: ${counter.value}'); // Output: Counter value: 8
// Create snapshot for fast recovery
await counter.createSnapshot();
// Clean up
await eventStore.close();
}
๐๏ธ Architecture #
Hybrid Architecture Benefits #
โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ Dactor โ โ Eventador โ โ DuraQ โ
โ Actors โโโโโถโ Event Store โโโโโถโ Operational โ
โ โ โ (Isar Direct) โ โ Queues โ
โ โข PersistentActorโ โ โ โ โ
โ โข Supervision โ โ โข Events (โ) โ โ โข Commands โ
โ โข Metrics โ โ โข Snapshots โ โ โข Sagas โ
โ โข Ask Pattern โ โ โข Recovery โ โ โข Projections โ
โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
Component Responsibilities:
- Dactor: Actor model foundation with supervision, messaging, and fault tolerance
- Isar: Permanent event storage with immutable audit trails and fast queries
- DuraQ: Reliable operational workflows with retry policies and dead letter queues
๐ Advanced Usage #
Two Patterns for Command/Event Handling in Aggregates #
Eventador provides two distinct approaches for handling commands and events in your AggregateRoot implementations. Understanding both patterns helps you choose the right approach for your use case.
Pattern 1: Override Pattern (Recommended for Most Cases)
Override the handleCommand() and eventHandler() methods directly. This pattern is type-safe, explicit, and IDE-friendly.
class BankAccountAggregate extends AggregateRoot<BankAccountState> {
BankAccountAggregate({
required String persistenceId,
required EventStore eventStore,
}) : super(persistenceId: persistenceId, eventStore: eventStore);
@override
BankAccountState createInitialState() => BankAccountState(
accountId: persistenceId,
balance: 0.0,
isActive: false,
version: 0,
lastModified: DateTime.now(),
);
// OVERRIDE: Command Handler
@override
Future<List<Event>> handleCommand(BankAccountState currentState, Command command) async {
return switch (command.runtimeType) {
OpenAccountCommand => _handleOpenAccount(currentState, command as OpenAccountCommand),
DepositMoneyCommand => _handleDeposit(currentState, command as DepositMoneyCommand),
WithdrawMoneyCommand => _handleWithdraw(currentState, command as WithdrawMoneyCommand),
_ => throw ArgumentError('Unknown command type: ${command.runtimeType}'),
};
}
// OVERRIDE: Event Handler
@override
void eventHandler(Event event) {
// CRITICAL: Ensure state is initialized during recovery
ensureStateInitialized();
if (event is! BankAccountEvent) {
throw ArgumentError('Expected BankAccountEvent, got ${event.runtimeType}');
}
switch (event.runtimeType) {
case AccountOpenedEvent:
_applyAccountOpened(event as AccountOpenedEvent);
break;
case MoneyDepositedEvent:
_applyMoneyDeposited(event as MoneyDepositedEvent);
break;
case MoneyWithdrawnEvent:
_applyMoneyWithdrawn(event as MoneyWithdrawnEvent);
break;
default:
throw ArgumentError('Unknown event type: ${event.runtimeType}');
}
}
// Command handlers (return events)
List<Event> _handleOpenAccount(BankAccountState state, OpenAccountCommand cmd) {
if (state.isActive) {
throw StateError('Account is already open');
}
return [AccountOpenedEvent(
accountId: cmd.accountId,
initialBalance: cmd.initialBalance,
)];
}
List<Event> _handleDeposit(BankAccountState state, DepositMoneyCommand cmd) {
if (!state.isActive) {
throw StateError('Account is not active');
}
return [MoneyDepositedEvent(
accountId: cmd.accountId,
amount: cmd.amount,
)];
}
// Event appliers (mutate currentState directly)
void _applyAccountOpened(AccountOpenedEvent event) {
currentState.isActive = true;
currentState.balance = event.initialBalance;
currentState.version++;
currentState.lastModified = event.timestamp;
}
void _applyMoneyDeposited(MoneyDepositedEvent event) {
currentState.balance += event.amount;
currentState.version++;
currentState.lastModified = event.timestamp;
}
}
โ Advantages of Override Pattern:
- Type Safety: Compiler catches missing event handlers
- IDE Support: Jump-to-definition, refactoring tools work perfectly
- Explicit: Clear switch statement shows all supported commands/events
- Debugging: Easy to set breakpoints and trace execution
- Validation: Can use pattern matching (switch expressions) in Dart 3+
โ Disadvantages:
- More boilerplate for simple aggregates
- Requires explicit switch/case for each command/event type
When to Use:
- Production applications with complex business logic
- When you need strong type safety and validation
- When working in a team (explicit contracts are clearer)
- When you have more than 3-5 commands or events
Pattern 2: Registry Pattern (Convenience for Simple Cases)
Register command and event handlers in maps. This pattern is concise and dynamic but less type-safe.
class SimpleCounterAggregate extends AggregateRoot<CounterState> {
SimpleCounterAggregate({
required String persistenceId,
required EventStore eventStore,
}) : super(persistenceId: persistenceId, eventStore: eventStore) {
// Register command handlers in constructor
commandHandlers[IncrementCommand] = _handleIncrement;
commandHandlers[DecrementCommand] = _handleDecrement;
// Register event handlers in constructor
eventHandlers[CounterIncrementedEvent] = _applyIncrement;
eventHandlers[CounterDecrementedEvent] = _applyDecrement;
}
@override
CounterState createInitialState() => CounterState(
counterId: persistenceId,
value: 0,
version: 0,
lastModified: DateTime.now(),
);
// Command handlers registered in map
Future<List<Event>> _handleIncrement(CounterState state, Command command) async {
final cmd = command as IncrementCommand;
return [CounterIncrementedEvent(
counterId: cmd.counterId,
incrementBy: cmd.incrementBy,
)];
}
Future<List<Event>> _handleDecrement(CounterState state, Command command) async {
final cmd = command as DecrementCommand;
if (state.value - cmd.decrementBy < 0) {
throw StateError('Counter cannot go negative');
}
return [CounterDecrementedEvent(
counterId: cmd.counterId,
decrementBy: cmd.decrementBy,
)];
}
// Event handlers registered in map
void _applyIncrement(Event event) {
final evt = event as CounterIncrementedEvent;
currentState.value += evt.incrementBy;
currentState.version++;
currentState.lastModified = evt.timestamp;
}
void _applyDecrement(Event event) {
final evt = event as CounterDecrementedEvent;
currentState.value -= evt.decrementBy;
currentState.version++;
currentState.lastModified = evt.timestamp;
}
}
โ Advantages of Registry Pattern:
- Concise: Less boilerplate for simple aggregates
- Dynamic: Can register handlers at runtime if needed
- Composable: Can share handlers across multiple aggregates
โ Disadvantages:
- Runtime Errors: Missing handlers only discovered when command is sent
- Less Type Safety: Requires manual casting in handler methods
- Harder to Debug: No explicit list of supported commands/events
- IDE Limitations: Jump-to-definition doesn't work as well
When to Use:
- Simple aggregates with few commands/events (2-3)
- Prototyping or rapid development
- When handler logic is shared across aggregates
- Educational examples or tutorials
Critical: Event Handler State Initialization #
Regardless of which pattern you use, if you override eventHandler(), you MUST call ensureStateInitialized() at the start of your event handler. This ensures state is initialized during recovery when events are replayed.
@override
void eventHandler(Event event) {
// CRITICAL: Call this before processing events
ensureStateInitialized();
// Then process your events...
switch (event.runtimeType) {
// ...
}
}
Why this is required:
- During recovery, Eventador replays events from storage to reconstruct state
- The first event might arrive before state is initialized
- Without this call, you'll get
Null check operator used on a null valueerrors - The base class normally handles this, but overriding bypasses that logic
This is NOT required for the Registry Pattern - the base class handles initialization automatically when using the eventHandlers map.
Comparison Table #
| Feature | Override Pattern | Registry Pattern |
|---|---|---|
| Type Safety | โ Strong | โ ๏ธ Runtime only |
| IDE Support | โ Excellent | โ ๏ธ Limited |
| Boilerplate | โ ๏ธ More verbose | โ Concise |
| Debugging | โ Easy | โ ๏ธ Harder |
| Runtime Flexibility | โ Static | โ Dynamic |
| Production Ready | โ Yes | โ ๏ธ Simple cases only |
| State Init | โ ๏ธ Manual ensureStateInitialized() |
โ Automatic |
Mixing Patterns (Not Recommended) #
You can technically mix both patterns, but this is not recommended as it creates confusion:
// โ DON'T DO THIS - Pick one pattern and stick with it
class MixedAggregate extends AggregateRoot<MyState> {
MixedAggregate(...) : super(...) {
commandHandlers[SomeCommand] = _handleSome; // Registry pattern
}
@override
Future<List<Event>> handleCommand(...) async {
// Override pattern - this takes precedence over registry!
// Your registered handlers in commandHandlers will be ignored
}
}
Important: If you override handleCommand(), the commandHandlers map is ignored. Same for eventHandler() and eventHandlers map. The override always takes precedence.
Recommendation #
For production applications, use the Override Pattern:
- It's explicit, type-safe, and maintainable
- Small amount of extra boilerplate is worth the safety
- Your future self (and team members) will thank you
For simple examples or prototypes, Registry Pattern is fine:
- Quick to write and understand
- Good for educational purposes
- Acceptable for aggregates with 2-3 commands
Aggregate Root Pattern #
class BankAccountState extends State {
final String accountId;
final double balance;
final bool isActive;
BankAccountState({
required this.accountId,
required this.balance,
required this.isActive,
required super.version,
required super.lastModified,
});
}
class BankAccountAggregate extends AggregateRoot<BankAccountState> {
BankAccountAggregate({
required String persistenceId,
required EventStore eventStore,
}) : super(persistenceId: persistenceId, eventStore: eventStore);
@override
BankAccountState get initialState => BankAccountState(
accountId: persistenceId,
balance: 0.0,
isActive: false,
version: 0,
lastModified: DateTime.now(),
);
@override
List<Event> handleCommand(BankAccountState currentState, Command command) {
if (command is OpenAccountCommand && !currentState.isActive) {
return [AccountOpenedEvent(
accountId: command.accountId,
initialBalance: command.initialBalance,
)];
} else if (command is DepositMoneyCommand && currentState.isActive) {
return [MoneyDepositedEvent(
accountId: command.accountId,
amount: command.amount,
)];
}
return [];
}
@override
BankAccountState applyEvent(BankAccountState currentState, Event event) {
if (event is AccountOpenedEvent) {
return currentState.copyWith(
balance: event.initialBalance,
isActive: true,
version: currentState.version + 1,
);
} else if (event is MoneyDepositedEvent) {
return currentState.copyWith(
balance: currentState.balance + event.amount,
version: currentState.version + 1,
);
}
return currentState;
}
}
Saga Pattern for Distributed Transactions #
// Define saga state
class OrderSagaState extends SagaState {
final String orderId;
final String customerId;
final double amount;
final bool paymentProcessed;
final bool orderShipped;
OrderSagaState({
required this.orderId,
required this.customerId,
required this.amount,
required super.status,
required super.lastUpdated,
this.paymentProcessed = false,
this.orderShipped = false,
});
@override
OrderSagaState copyWith({
SagaStatus? status,
DateTime? lastUpdated,
bool? paymentProcessed,
bool? orderShipped,
}) {
return OrderSagaState(
orderId: orderId,
customerId: customerId,
amount: amount,
status: status ?? this.status,
lastUpdated: lastUpdated ?? this.lastUpdated,
paymentProcessed: paymentProcessed ?? this.paymentProcessed,
orderShipped: orderShipped ?? this.orderShipped,
);
}
}
// Define saga implementation
class OrderSaga extends Saga {
late OrderSagaState _state;
OrderSaga({
required String persistenceId,
required EventStore eventStore,
required QueueManager duraqManager,
}) : super(
persistenceId: persistenceId,
eventStore: eventStore,
duraqManager: duraqManager,
);
@override
SagaState get sagaState => _state;
@override
Future<void> startSaga(Command initialCommand) async {
if (initialCommand is CreateOrderCommand) {
_state = OrderSagaState(
orderId: initialCommand.orderId,
customerId: initialCommand.customerId,
amount: initialCommand.amount,
status: SagaStatus.running,
lastUpdated: DateTime.now(),
);
await saveSagaState();
// Send command to payment processor using DuraQ
await sendCommand('payment_processor', ProcessPaymentCommand(
orderId: _state.orderId,
amount: _state.amount,
));
}
}
@override
Future<void> handleSagaEvent(Event event) async {
if (event is PaymentProcessedEvent) {
_state = _state.copyWith(
paymentProcessed: true,
lastUpdated: DateTime.now(),
);
await saveSagaState();
// Continue with shipping
await sendCommand('shipping_processor', ShipOrderCommand(
orderId: _state.orderId,
address: '123 Main St', // In real implementation, get from order
));
} else if (event is OrderShippedEvent) {
_state = _state.copyWith(
orderShipped: true,
status: SagaStatus.completed,
lastUpdated: DateTime.now(),
);
await saveSagaState();
print('Order ${_state.orderId} completed successfully!');
} else if (event is OrderFailedEvent) {
_state = _state.copyWith(
status: SagaStatus.failed,
lastUpdated: DateTime.now(),
);
await saveSagaState();
// Start compensation
await compensate([event]);
}
}
@override
Future<void> compensate(List<Event> eventsToCompensate) async {
_state = _state.copyWith(
status: SagaStatus.compensating,
lastUpdated: DateTime.now(),
);
await saveSagaState();
// Implement compensation logic
for (final event in eventsToCompensate.reversed) {
if (event is PaymentProcessedEvent) {
await sendCommand('payment_processor', RefundPaymentCommand(
orderId: _state.orderId,
amount: _state.amount,
));
}
}
}
@override
Future<void> commandHandler(Command command) async {
if (command is CreateOrderCommand) {
await startSaga(command);
}
// Handle other saga-specific commands
}
@override
void eventHandler(Event event) {
// Route saga events to handleSagaEvent
handleSagaEvent(event);
}
// Schedule timeout for saga steps
Future<void> schedulePaymentTimeout() async {
await scheduleTimeout(
Duration(minutes: 5),
'payment-timeout-${_state.orderId}',
);
}
}
### Saga Orchestration
The `SagaCoordinator` is a background processor responsible for dequeuing commands and timeouts from reliable queues and delivering them to the appropriate saga instances. It is not an actor and should not be spawned.
Here is a complete example of how to set up and run a saga, including the `SagaCoordinator`:
```dart
import 'dart:async';
import 'dart:io';
import 'package:dactor/dactor.dart';
import 'package:duraq/duraq.dart';
import 'package:eventador/eventador.dart';
import 'package:isar/isar.dart';
// ... (Saga, Command, and Event definitions from above)
Future<void> main() async {
final tempDir = Directory.systemTemp.createTempSync('isar_test_').path;
// Initialize Isar core
await Isar.initializeIsarCore(download: true);
// Create Isar instance with required schemas
final isar = await Isar.open(
[...IsarStorage.requiredSchemas, ...IsarEventStore.requiredSchemas],
directory: tempDir,
name: 'tmp_queue',
);
final storage = await IsarStorage(isar);
final duraqManager = QueueManager(storage);
// Initialize Eventador with external Isar instance
final eventStore = IsarEventStore(isar);
final actorSystem = LocalActorSystem();
// The SagaCoordinator is a background processor, not an actor.
final sagaCoordinator = SagaCoordinator(duraqManager, eventStore, actorSystem);
unawaited(sagaCoordinator.processSagaCommands());
unawaited(sagaCoordinator.processSagaTimeouts());
// Spawn worker actors
await actorSystem.spawn('payment_processor', () => PaymentProcessor());
await actorSystem.spawn('shipping_processor', () => ShippingProcessor());
// Start the saga
final orderId = 'order_123';
final createOrderCommand = CreateOrder(
aggregateId: orderId,
customerId: 'customer_456',
amount: 99.99,
);
final saga = await actorSystem.spawn(
orderId,
() => OrderSaga(
persistenceId: orderId,
eventStore: eventStore,
duraqManager: duraqManager,
),
);
print('Starting saga for order $orderId...');
saga.tell(createOrderCommand);
// Wait for the saga to complete
await Future.delayed(const Duration(seconds: 5));
// Clean up
print('Shutting down...');
await actorSystem.shutdown();
await isar.close(deleteFromDisk: true);
print('Example finished.');
}
// Example commands and events
class CreateOrderCommand extends Command {
final String orderId;
final String customerId;
final double amount;
CreateOrderCommand({
required this.orderId,
required this.customerId,
required this.amount,
});
}
class ProcessPaymentCommand extends Command {
final String orderId;
final double amount;
ProcessPaymentCommand({
required this.orderId,
required this.amount,
});
}
class ShipOrderCommand extends Command {
final String orderId;
final String address;
ShipOrderCommand({
required this.orderId,
required this.address,
});
}
class RefundPaymentCommand extends Command {
final String orderId;
final double amount;
RefundPaymentCommand({
required this.orderId,
required this.amount,
});
}
class PaymentProcessedEvent extends Event {
final String orderId;
final String transactionId;
PaymentProcessedEvent({
required this.orderId,
required this.transactionId,
});
}
class OrderShippedEvent extends Event {
final String orderId;
final String trackingNumber;
OrderShippedEvent({
required this.orderId,
required this.trackingNumber,
});
}
class OrderFailedEvent extends Event {
final String orderId;
final String reason;
OrderFailedEvent({
required this.orderId,
required this.reason,
});
}
Event Projections for Read Models #
class UserStatisticsProjection extends Projection<UserStatistics> {
UserStatistics _readModel = UserStatistics.empty();
@override
String get projectionId => 'user-statistics';
@override
UserStatistics get readModel => _readModel;
@override
List<Type> get interestedEventTypes => [
UserRegisteredEvent,
UserLoginEvent,
UserLogoutEvent,
UserProfileUpdatedEvent,
];
@override
Future<bool> handle(Event event) async {
switch (event.runtimeType) {
case UserRegisteredEvent:
_handleUserRegistered(event as UserRegisteredEvent);
return true;
case UserLoginEvent:
_handleUserLogin(event as UserLoginEvent);
return true;
case UserLogoutEvent:
_handleUserLogout(event as UserLogoutEvent);
return true;
case UserProfileUpdatedEvent:
_handleUserProfileUpdated(event as UserProfileUpdatedEvent);
return true;
default:
return false;
}
}
void _handleUserRegistered(UserRegisteredEvent event) {
_readModel = _readModel.copyWith(
totalUsers: _readModel.totalUsers + 1,
newUsersToday: _readModel.newUsersToday + 1,
lastUpdated: DateTime.now(),
);
}
void _handleUserLogin(UserLoginEvent event) {
_readModel = _readModel.copyWith(
totalLogins: _readModel.totalLogins + 1,
activeUsersToday: _readModel.activeUsersToday + 1,
lastUpdated: DateTime.now(),
);
}
void _handleUserLogout(UserLogoutEvent event) {
// Calculate session duration
final sessionDuration = event.timestamp.difference(event.loginTime);
final avgDuration = _readModel.averageSessionDuration;
final newAverage = avgDuration != null
? Duration(milliseconds: ((avgDuration.inMilliseconds + sessionDuration.inMilliseconds) / 2).round())
: sessionDuration;
_readModel = _readModel.copyWith(
averageSessionDuration: newAverage,
lastUpdated: DateTime.now(),
);
}
void _handleUserProfileUpdated(UserProfileUpdatedEvent event) {
_readModel = _readModel.copyWith(
profileUpdatesToday: _readModel.profileUpdatesToday + 1,
lastUpdated: DateTime.now(),
);
}
@override
Future<void> rebuild() async {
_readModel = UserStatistics.empty();
// In a real implementation, this would replay all events
}
@override
Future<void> reset() async {
_readModel = UserStatistics.empty();
}
@override
Future<int> getCheckpoint() async {
// This would be loaded from storage
return 0;
}
@override
Future<void> updateCheckpoint(int sequenceNumber) async {
// This would be saved to storage
}
}
// Read model for user statistics
class UserStatistics {
final int totalUsers;
final int newUsersToday;
final int activeUsersToday;
final int totalLogins;
final int profileUpdatesToday;
final Duration? averageSessionDuration;
final DateTime lastUpdated;
const UserStatistics({
required this.totalUsers,
required this.newUsersToday,
required this.activeUsersToday,
required this.totalLogins,
required this.profileUpdatesToday,
this.averageSessionDuration,
required this.lastUpdated,
});
factory UserStatistics.empty() {
return UserStatistics(
totalUsers: 0,
newUsersToday: 0,
activeUsersToday: 0,
totalLogins: 0,
profileUpdatesToday: 0,
lastUpdated: DateTime.now(),
);
}
UserStatistics copyWith({
int? totalUsers,
int? newUsersToday,
int? activeUsersToday,
int? totalLogins,
int? profileUpdatesToday,
Duration? averageSessionDuration,
DateTime? lastUpdated,
}) {
return UserStatistics(
totalUsers: totalUsers ?? this.totalUsers,
newUsersToday: newUsersToday ?? this.newUsersToday,
activeUsersToday: activeUsersToday ?? this.activeUsersToday,
totalLogins: totalLogins ?? this.totalLogins,
profileUpdatesToday: profileUpdatesToday ?? this.profileUpdatesToday,
averageSessionDuration: averageSessionDuration ?? this.averageSessionDuration,
lastUpdated: lastUpdated ?? this.lastUpdated,
);
}
Map<String, dynamic> toMap() {
return {
'totalUsers': totalUsers,
'newUsersToday': newUsersToday,
'activeUsersToday': activeUsersToday,
'totalLogins': totalLogins,
'profileUpdatesToday': profileUpdatesToday,
'averageSessionDurationMs': averageSessionDuration?.inMilliseconds,
'lastUpdated': lastUpdated.toIso8601String(),
};
}
factory UserStatistics.fromMap(Map<String, dynamic> map) {
return UserStatistics(
totalUsers: map['totalUsers'] as int,
newUsersToday: map['newUsersToday'] as int,
activeUsersToday: map['activeUsersToday'] as int,
totalLogins: map['totalLogins'] as int,
profileUpdatesToday: map['profileUpdatesToday'] as int,
averageSessionDuration: map['averageSessionDurationMs'] != null
? Duration(milliseconds: map['averageSessionDurationMs'] as int)
: null,
lastUpdated: DateTime.parse(map['lastUpdated'] as String),
);
}
}
// Example user events
class UserRegisteredEvent extends Event with AggregateEvent, SerializableEvent {
final String userId;
final String email;
final String username;
UserRegisteredEvent({
required this.userId,
required this.email,
required this.username,
});
@override
String get aggregateId => userId;
@override
String get aggregateType => 'User';
@override
Map<String, dynamic> getEventData() {
return {
'userId': userId,
'email': email,
'username': username,
};
}
}
class UserLoginEvent extends Event with AggregateEvent, SerializableEvent {
final String userId;
final String sessionId;
UserLoginEvent({
required this.userId,
required this.sessionId,
});
@override
String get aggregateId => userId;
@override
String get aggregateType => 'User';
@override
Map<String, dynamic> getEventData() {
return {
'userId': userId,
'sessionId': sessionId,
};
}
}
class UserLogoutEvent extends Event with AggregateEvent, SerializableEvent {
final String userId;
final String sessionId;
final DateTime loginTime;
UserLogoutEvent({
required this.userId,
required this.sessionId,
required this.loginTime,
});
@override
String get aggregateId => userId;
@override
String get aggregateType => 'User';
@override
Map<String, dynamic> getEventData() {
return {
'userId': userId,
'sessionId': sessionId,
'loginTime': loginTime.toIso8601String(),
};
}
}
ProjectionActor: Actor-Based Projection Runtime #
Projection is a plain stream description; ProjectionManager drives it via Stream.listen. That works for fire-and-forget read-model updates, but leaves callers with no way to ask "has the read model caught up to this event yet?" in a CQRS-clean way.
ProjectionActor<T> is the actor-based runtime wrapper โ modelled after Apache Pekko / Akka's ProjectionBehavior. It owns the event-stream subscription, drives projection.handle(event), manages checkpoints, and responds to query messages. Spawning is the registration โ there is no separate manager to register with; the actor system itself is the registry, exactly as in Pekko.
import 'package:eventador/eventador.dart';
final system = ActorSystem.create();
final projection = MyUserProjection();
// Spawn the projection actor. The actor name (here 'projection-my-users')
// is its identity in the system โ look it up later via system.getActor(...).
final projectionRef = await system.spawn(
'projection-${projection.projectionId}',
() => ProjectionActor(projection, eventStore, isar: isar),
);
The CQRS Command โ Read-Model Gap
A coordinator that just dispatched a command often needs to observe the resulting read-model change before responding to its caller. Polling read-model storage or writing to the read model directly from the aggregate both violate CQRS. AwaitEventApplied closes the gap:
// Dispatch the command to the aggregate.
final aggregate = system.getActor('user-${userId}');
aggregate!.tell(UpdateUserEmail(userId: userId, newEmail: '...'));
// Wait for the projection to apply the resulting event.
final response = await projectionRef.ask<EventAppliedResponse>(
AwaitEventApplied(
(e) => e is UserEmailUpdated && e.userId == userId,
timeout: Duration(seconds: 5),
),
);
// At this point projection.handle() has returned for the matching event โ
// it is safe to read the projection's read model and trust it reflects the
// command's effect.
If the projection's read model is already up-to-date when the request arrives, pass an alreadySatisfied short-circuit so the actor responds immediately without waiting for another event:
final response = await projectionRef.ask<EventAppliedResponse>(
AwaitEventApplied(
(e) => e is UserEmailUpdated && e.userId == userId,
alreadySatisfied: () => projection.readModel.emailFor(userId) == newEmail,
),
);
A timeout responds with AwaitFailed(reason: 'timeout'). Stopping the actor while requests are pending responds with AwaitFailed(reason: 'stopped').
Lifecycle Messages
| Message | Purpose |
|---|---|
AwaitEventApplied(predicate, timeout, alreadySatisfied) |
Reply once a matching event has been applied |
GetProjectionInfo() |
Returns ProjectionInfoResponse(info) |
PauseProjection() |
Cancels the subscription; actor stays alive |
ResumeProjection() |
Resubscribes from the last processed sequence |
RebuildProjection() |
Calls projection.reset() and replays from sequence 0 |
StopProjection() |
Cancels subscription, flushes checkpoint, replies StoppedAck, then terminates โ Pekko's ProjectionBehavior.Stop equivalent |
Invariants:
- Events are processed sequentially (mailbox-ordered with respect to query messages).
EventAppliedResponseis sent strictly afterprojection.handle()returns for the matching event.- A
handle()failure does not advance the checkpoint and does not resolve pending awaiters โ the projection'sonErrorhook is invoked, and the event will be retried on next subscription.
ProjectionManager continues to work unchanged for callers that don't need request/response semantics. ProjectionActor and ProjectionManager can coexist in the same application.
โ๏ธ Configuration #
Snapshot Configuration #
// Development configuration
final devConfig = SnapshotConfig.development; // Snapshots every 50 events
// Production configuration
final prodConfig = SnapshotConfig.production; // Snapshots every 200 events
// Custom configuration
final customConfig = SnapshotConfig(
eventCountThreshold: 100,
timeThreshold: Duration(minutes: 5),
maxSnapshotsToKeep: 3,
enableCompression: true,
compressionThreshold: 2048,
minTimeBetweenSnapshots: Duration(minutes: 1),
);
// Use with PersistentActor
final actor = MyPersistentActor(
persistenceId: 'my-actor',
eventStore: eventStore,
snapshotConfig: customConfig,
);
External Isar Instance #
For shared database scenarios:
// Create shared Isar instance
final isar = await Isar.open(
[...IsarStorage.requiredSchemas, ...IsarEventStore.requiredSchemas],
directory: '/path/to/shared/db',
name: 'shared_database',
);
// Use with multiple components
final storage = IsarStorage(isar);
final eventStore = IsarEventStore(isar);
final duraqManager = QueueManager(storage);
๐ Event Type Registry (Critical) #
โ ๏ธ IMPORTANT: Before using Eventador with CBOR serialization, you MUST register all your event types with the EventRegistry. This is required for event deserialization from storage after system restarts.
Why Event Registration is Required #
When events are persisted to Isar using CBOR serialization, they are stored as binary data along with their type name. During recovery (e.g., after a system restart), Eventador needs to deserialize these events back into their original Dart objects. The EventRegistry provides the mapping from type names to factory functions that can reconstruct the events.
Without registration, you will get:
ArgumentError: Event type 'YourEvent' not registered
When to Register #
Event types MUST be registered during application initialization, BEFORE:
- Creating any
AggregateRootinstances - Starting
ProjectionManager - Spawning any
PersistentActorinstances
How to Register Events #
import 'package:eventador/eventador.dart';
// During application startup (before any event sourcing operations)
void registerEventTypes() {
// Register each event type with its type name and fromMap factory
EventRegistry.register<AccountOpenedEvent>(
'AccountOpenedEvent',
(map) => AccountOpenedEvent.fromMap(map),
);
EventRegistry.register<MoneyDepositedEvent>(
'MoneyDepositedEvent',
(map) => MoneyDepositedEvent.fromMap(map),
);
EventRegistry.register<MoneyWithdrawnEvent>(
'MoneyWithdrawnEvent',
(map) => MoneyWithdrawnEvent.fromMap(map),
);
// ... register all other event types
}
// Call during app initialization
Future<void> main() async {
// Initialize Isar
await Isar.initializeIsarCore(download: true);
// Register event types FIRST
registerEventTypes();
// Then initialize event store and actor system
final eventStore = await IsarEventStore.create(directory: './data');
final actorSystem = LocalActorSystem();
// Now safe to spawn persistent actors
// ...
}
Event fromMap Requirements #
Each event class must implement a fromMap factory method that can reconstruct the event from a Map:
class AccountOpenedEvent extends Event with AggregateEvent, SerializableEvent {
final String accountId;
final double initialBalance;
AccountOpenedEvent({
required this.accountId,
required this.initialBalance,
String? eventId,
DateTime? timestamp,
int? version,
}) : super(
eventId: eventId ?? Uuid().v4(),
timestamp: timestamp ?? DateTime.now(),
version: version ?? 1,
);
@override
String get aggregateId => accountId;
@override
String get aggregateType => 'BankAccount';
@override
Map<String, dynamic> getEventData() {
return {
'accountId': accountId,
'initialBalance': initialBalance,
};
}
// REQUIRED: Factory method for deserialization
static AccountOpenedEvent fromMap(Map<String, dynamic> map) {
return AccountOpenedEvent(
accountId: map['accountId'] as String,
initialBalance: map['initialBalance'] as double,
eventId: map['eventId'] as String?,
timestamp: map['timestamp'] != null
? (map['timestamp'] is String
? DateTime.parse(map['timestamp'] as String)
: map['timestamp'] as DateTime)
: null,
version: map['version'] as int?,
);
}
}
Complete Example with Registration #
import 'package:eventador/eventador.dart';
import 'package:isar/isar.dart';
// Step 1: Define events with fromMap factories
class WalletCreatedEvent extends Event with AggregateEvent, SerializableEvent {
final String walletId;
final String name;
WalletCreatedEvent({
required this.walletId,
required this.name,
String? eventId,
DateTime? timestamp,
}) : super(
eventId: eventId ?? Uuid().v4(),
timestamp: timestamp ?? DateTime.now(),
);
@override
String get aggregateId => walletId;
@override
String get aggregateType => 'Wallet';
@override
Map<String, dynamic> getEventData() => {
'walletId': walletId,
'name': name,
};
static WalletCreatedEvent fromMap(Map<String, dynamic> map) {
return WalletCreatedEvent(
walletId: map['walletId'] as String,
name: map['name'] as String,
eventId: map['eventId'] as String?,
timestamp: map['timestamp'] != null
? (map['timestamp'] is String
? DateTime.parse(map['timestamp'])
: map['timestamp'] as DateTime)
: null,
);
}
}
// Step 2: Register all event types during initialization
void registerEventTypes() {
print('Registering event types for deserialization...');
EventRegistry.register<WalletCreatedEvent>(
'WalletCreatedEvent',
(map) => WalletCreatedEvent.fromMap(map),
);
// Register ALL your event types here
print('โ Registered ${EventRegistry.registeredTypes.length} event types');
}
// Step 3: Initialize in correct order
Future<void> main() async {
// 1. Initialize Isar
await Isar.initializeIsarCore(download: true);
final isar = await Isar.open([...IsarEventStore.requiredSchemas]);
// 2. Register event types (BEFORE EventStore operations)
registerEventTypes();
// 3. Create EventStore
final eventStore = IsarEventStore(isar);
// 4. Create and start ProjectionManager (if using projections)
final projectionManager = ProjectionManager(eventStore);
// ... register projections ...
await projectionManager.start();
// 5. Spawn aggregates and persistent actors
final actorSystem = LocalActorSystem();
// ... spawn actors ...
print('System initialized successfully');
}
DateTime Field Handling #
Important: When deserializing events from CBOR storage, DateTime fields may be deserialized as either String (ISO-8601) or DateTime objects depending on the serialization layer. Always handle both cases in your fromMap methods:
static MyEvent fromMap(Map<String, dynamic> map) {
return MyEvent(
// Handle DateTime fields that might be String or DateTime
createdAt: map['createdAt'] != null
? (map['createdAt'] is String
? DateTime.parse(map['createdAt'] as String)
: map['createdAt'] as DateTime)
: null,
// ... other fields
);
}
Best Practices #
- Centralize Registration: Create a single
registerEventTypes()function that registers all event types in your application - Register Early: Call registration during app initialization, before any event sourcing operations
- Type Name Consistency: Use the exact runtime type name (e.g.,
'WalletCreatedEvent') as the registration key - Complete Coverage: Register ALL event types used by your aggregates and projections
- Version Support: If using event versioning, register all versions of each event type
Troubleshooting #
Error: Event type 'XYZ' not registered
- Solution: Add
EventRegistry.register<XYZ>('XYZ', (map) => XYZ.fromMap(map))during initialization
Error: type 'String' is not a subtype of type 'DateTime'
- Solution: Update your
fromMapto handle DateTime fields as both String and DateTime (see example above)
Events deserialize with null fields after restart
- Solution: Ensure your
fromMapmethod reads from the correct map keys returned bygetEventData()
๐ Performance #
Eventador is designed for high performance:
- Write Throughput: >10,000 events/second
- Read Throughput: >50,000 events/second
- Recovery Time: <100ms for 1,000 events
- Memory Overhead: <5KB per persistent actor
- Storage Efficiency: <500 bytes per event average
Performance Tips #
- Use Snapshots: Configure appropriate snapshot thresholds for your use case
- Batch Operations: Use
persistEvents()for multiple events - Optimize Serialization: Keep event payloads small and efficient
- Monitor Memory: Use snapshot cleanup to prevent unbounded growth
- Index Strategy: Leverage Isar's indexing for fast queries
๐งช Testing #
Eventador provides comprehensive testing utilities:
import 'package:eventador/eventador.dart';
import 'package:test/test.dart';
void main() {
group('CounterActor Tests', () {
late EventStore eventStore;
late CounterActor counter;
setUp(() async {
eventStore = await IsarEventStore.create(directory: null); // In-memory
counter = CounterActor(
persistenceId: 'test-counter',
eventStore: eventStore,
);
counter.preStart();
await Future.delayed(Duration(milliseconds: 50));
});
tearDown(() async {
await eventStore.close();
});
test('should increment counter', () async {
await counter.onMessage(IncrementCommand(aggregateId: 'test-counter', value: 5));
expect(counter.value, equals(5));
});
test('should recover state after restart', () async {
// Add some events
await counter.onMessage(IncrementCommand(aggregateId: 'test-counter', value: 10));
// Create new instance (simulates restart)
final recoveredCounter = CounterActor(
persistenceId: 'test-counter',
eventStore: eventStore,
);
recoveredCounter.preStart();
await Future.delayed(Duration(milliseconds: 50));
expect(recoveredCounter.value, equals(10));
});
});
}
๐ Examples #
The example/ directory contains comprehensive examples:
- Basic Counter: Simple persistent actor with snapshots
- Order Processing Saga: Complete saga implementation with DuraQ integration
- Banking System: Account aggregates with business rules
- E-commerce Platform: Order processing with projections
- Chat System: Message persistence with read models
๐ ๏ธ Development #
Building #
# Get dependencies
dart pub get
# Generate code (for Isar schemas)
dart run build_runner build
# Run tests
dart test
# Run example
dart run example/eventador_example.dart
Contributing #
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Run
dart testanddart analyze - Submit a pull request
๐ Roadmap #
โ Completed (Phase 1-3) #
- โ Core persistent actor framework
- โ Isar event store with CBOR serialization
- โ Event sourcing patterns (Command/Event/State)
- โ Aggregate root implementation
- โ Snapshot system with configurable policies
- โ Saga pattern with DuraQ integration
- โ Event projections and read models
- โ
Actor-based projection runtime (
ProjectionActor) withAwaitEventAppliedfor CQRS read-model await semantics
๐ง In Progress (Phase 4) #
- โ Performance optimizations and batching
- โ Comprehensive monitoring and metrics
- โ Migration tools and utilities
- โ Production-ready documentation
๐ฎ Future #
- โ Multi-tenant support
- โ Event store clustering
- โ GraphQL integration for read models
- โ Real-time event streaming
- โ Advanced debugging tools
๐ค Ecosystem #
Eventador is part of a larger ecosystem:
๐ License #
This project is licensed under the MIT License - see the LICENSE file for details.
๐ Acknowledgments #
- Isar Team: For the excellent database foundation
- Dactor Community: For the actor model inspiration
- DuraQ Contributors: For reliable queue operations
- Event Sourcing Community: For patterns and best practices
๐ Support #
- Documentation: Full API Documentation
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Discord: Community Discord
Built with โค๏ธ for the Dart and Flutter community