Eventador

Pub Version Dart SDK Version License: MIT

Eventador is an industrial-grade persistence and event sourcing extension for the Dactor actor system. It provides durable state management, event sourcing capabilities, and CBOR-based serialization using Isar database as the storage backend.

๐Ÿš€ Features

Core Event Sourcing

  • Persistent Actors: Actors that survive system restarts with automatic state recovery
  • Event Sourcing: Complete command โ†’ event โ†’ state pipeline with audit trails
  • CBOR Serialization: Efficient binary serialization for optimal storage performance
  • Snapshot System: Configurable snapshots for fast recovery and performance optimization
  • Aggregate Root Pattern: Domain-driven design patterns for complex business logic

Advanced Patterns

  • Saga Pattern: Long-running distributed transactions with compensation actions
  • Event Projections: Real-time read models and materialized views
  • Command/Event/State Abstractions: Type-safe event sourcing patterns
  • Optimistic Concurrency Control: Version-based conflict resolution

Hybrid Architecture

Eventador leverages a powerful hybrid architecture combining:

  • Dactor: High-performance actor model with supervision and messaging
  • Isar: Direct permanent event storage for immutable event logs
  • DuraQ: Operational workflows for sagas, projections, and command processing

This separation ensures permanent audit trails in Isar while handling transient operational workflows through DuraQ.

๐Ÿ“ฆ Installation

Add Eventador to your pubspec.yaml:

dependencies:
  eventador: ^1.0.0
  isar: ^3.1.0+1
  cbor: ^6.0.0
  
  # Local dependencies for hybrid architecture
  dactor:
    path: ../dactor  # Actor model foundation
  duraq:
    path: ../duraq  # Operational workflows

dev_dependencies:
  isar_generator: ^3.1.0+1
  build_runner: ^2.4.7

๐Ÿƒโ€โ™‚๏ธ Quick Start

1. Basic Persistent Actor

import 'package:eventador/eventador.dart';

// Define your commands
class IncrementCommand extends Command {
  final String aggregateId;
  final int value;

  IncrementCommand({required this.aggregateId, required this.value});
}

// Define your events
class CounterIncrementedEvent extends Event {
  final String aggregateId;
  final int value;

  CounterIncrementedEvent({required this.aggregateId, required this.value});
}

// Create a persistent actor
class CounterActor extends PersistentActor {
  int _value = 0;

  CounterActor({
    required String persistenceId,
    required EventStore eventStore,
  }) : super(persistenceId: persistenceId, eventStore: eventStore);

  int get value => _value;

  @override
  Future<void> commandHandler(Command command) async {
    if (command is IncrementCommand) {
      final event = CounterIncrementedEvent(
        aggregateId: persistenceId,
        value: command.value,
      );
      await persistEvent(event);
    }
  }

  @override
  void eventHandler(Event event) {
    if (event is CounterIncrementedEvent) {
      _value += event.value;
    }
  }

  @override
  Future<dynamic> getSnapshotState() async => {'value': _value};

  @override
  Future<void> onSnapshot(dynamic snapshotState, int sequenceNumber) async {
    if (snapshotState is Map<String, dynamic>) {
      _value = snapshotState['value'] as int? ?? 0;
    }
  }
}

2. Initialize and Use

Future<void> main() async {
  // Initialize Isar for event storage
  await Isar.initializeIsarCore(download: true);
  final eventStore = await IsarEventStore.create(directory: './data');

  // Create and start the actor
  final counter = CounterActor(
    persistenceId: 'counter-1',
    eventStore: eventStore,
  );

  // Start the actor (triggers recovery)
  counter.preStart();
  await Future.delayed(Duration(milliseconds: 100));

  // Send commands
  await counter.onMessage(IncrementCommand(aggregateId: 'counter-1', value: 5));
  await counter.onMessage(IncrementCommand(aggregateId: 'counter-1', value: 3));

  print('Counter value: ${counter.value}'); // Output: Counter value: 8

  // Create snapshot for fast recovery
  await counter.createSnapshot();

  // Clean up
  await eventStore.close();
}

๐Ÿ—๏ธ Architecture

Hybrid Architecture Benefits

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   Dactor         โ”‚    โ”‚   Eventador      โ”‚    โ”‚   DuraQ         โ”‚
โ”‚   Actors         โ”‚โ”€โ”€โ”€โ–ถโ”‚   Event Store    โ”‚โ”€โ”€โ”€โ–ถโ”‚   Operational   โ”‚
โ”‚                  โ”‚    โ”‚   (Isar Direct)  โ”‚    โ”‚   Queues        โ”‚
โ”‚ โ€ข PersistentActorโ”‚    โ”‚                  โ”‚    โ”‚                 โ”‚
โ”‚ โ€ข Supervision    โ”‚    โ”‚ โ€ข Events (โˆž)     โ”‚    โ”‚ โ€ข Commands      โ”‚
โ”‚ โ€ข Metrics        โ”‚    โ”‚ โ€ข Snapshots      โ”‚    โ”‚ โ€ข Sagas         โ”‚
โ”‚ โ€ข Ask Pattern    โ”‚    โ”‚ โ€ข Recovery       โ”‚    โ”‚ โ€ข Projections   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Component Responsibilities:

  • Dactor: Actor model foundation with supervision, messaging, and fault tolerance
  • Isar: Permanent event storage with immutable audit trails and fast queries
  • DuraQ: Reliable operational workflows with retry policies and dead letter queues

๐Ÿ“š Advanced Usage

Two Patterns for Command/Event Handling in Aggregates

Eventador provides two distinct approaches for handling commands and events in your AggregateRoot implementations. Understanding both patterns helps you choose the right approach for your use case.

Override the handleCommand() and eventHandler() methods directly. This pattern is type-safe, explicit, and IDE-friendly.

class BankAccountAggregate extends AggregateRoot<BankAccountState> {
  BankAccountAggregate({
    required String persistenceId,
    required EventStore eventStore,
  }) : super(persistenceId: persistenceId, eventStore: eventStore);

  @override
  BankAccountState createInitialState() => BankAccountState(
    accountId: persistenceId,
    balance: 0.0,
    isActive: false,
    version: 0,
    lastModified: DateTime.now(),
  );

  // OVERRIDE: Command Handler
  @override
  Future<List<Event>> handleCommand(BankAccountState currentState, Command command) async {
    return switch (command.runtimeType) {
      OpenAccountCommand => _handleOpenAccount(currentState, command as OpenAccountCommand),
      DepositMoneyCommand => _handleDeposit(currentState, command as DepositMoneyCommand),
      WithdrawMoneyCommand => _handleWithdraw(currentState, command as WithdrawMoneyCommand),
      _ => throw ArgumentError('Unknown command type: ${command.runtimeType}'),
    };
  }

  // OVERRIDE: Event Handler
  @override
  void eventHandler(Event event) {
    // CRITICAL: Ensure state is initialized during recovery
    ensureStateInitialized();
    
    if (event is! BankAccountEvent) {
      throw ArgumentError('Expected BankAccountEvent, got ${event.runtimeType}');
    }

    switch (event.runtimeType) {
      case AccountOpenedEvent:
        _applyAccountOpened(event as AccountOpenedEvent);
        break;
      case MoneyDepositedEvent:
        _applyMoneyDeposited(event as MoneyDepositedEvent);
        break;
      case MoneyWithdrawnEvent:
        _applyMoneyWithdrawn(event as MoneyWithdrawnEvent);
        break;
      default:
        throw ArgumentError('Unknown event type: ${event.runtimeType}');
    }
  }

  // Command handlers (return events)
  List<Event> _handleOpenAccount(BankAccountState state, OpenAccountCommand cmd) {
    if (state.isActive) {
      throw StateError('Account is already open');
    }
    return [AccountOpenedEvent(
      accountId: cmd.accountId,
      initialBalance: cmd.initialBalance,
    )];
  }

  List<Event> _handleDeposit(BankAccountState state, DepositMoneyCommand cmd) {
    if (!state.isActive) {
      throw StateError('Account is not active');
    }
    return [MoneyDepositedEvent(
      accountId: cmd.accountId,
      amount: cmd.amount,
    )];
  }

  // Event appliers (mutate currentState directly)
  void _applyAccountOpened(AccountOpenedEvent event) {
    currentState.isActive = true;
    currentState.balance = event.initialBalance;
    currentState.version++;
    currentState.lastModified = event.timestamp;
  }

  void _applyMoneyDeposited(MoneyDepositedEvent event) {
    currentState.balance += event.amount;
    currentState.version++;
    currentState.lastModified = event.timestamp;
  }
}

โœ… Advantages of Override Pattern:

  • Type Safety: Compiler catches missing event handlers
  • IDE Support: Jump-to-definition, refactoring tools work perfectly
  • Explicit: Clear switch statement shows all supported commands/events
  • Debugging: Easy to set breakpoints and trace execution
  • Validation: Can use pattern matching (switch expressions) in Dart 3+

โŒ Disadvantages:

  • More boilerplate for simple aggregates
  • Requires explicit switch/case for each command/event type

When to Use:

  • Production applications with complex business logic
  • When you need strong type safety and validation
  • When working in a team (explicit contracts are clearer)
  • When you have more than 3-5 commands or events

Pattern 2: Registry Pattern (Convenience for Simple Cases)

Register command and event handlers in maps. This pattern is concise and dynamic but less type-safe.

class SimpleCounterAggregate extends AggregateRoot<CounterState> {
  SimpleCounterAggregate({
    required String persistenceId,
    required EventStore eventStore,
  }) : super(persistenceId: persistenceId, eventStore: eventStore) {
    // Register command handlers in constructor
    commandHandlers[IncrementCommand] = _handleIncrement;
    commandHandlers[DecrementCommand] = _handleDecrement;
    
    // Register event handlers in constructor
    eventHandlers[CounterIncrementedEvent] = _applyIncrement;
    eventHandlers[CounterDecrementedEvent] = _applyDecrement;
  }

  @override
  CounterState createInitialState() => CounterState(
    counterId: persistenceId,
    value: 0,
    version: 0,
    lastModified: DateTime.now(),
  );

  // Command handlers registered in map
  Future<List<Event>> _handleIncrement(CounterState state, Command command) async {
    final cmd = command as IncrementCommand;
    return [CounterIncrementedEvent(
      counterId: cmd.counterId,
      incrementBy: cmd.incrementBy,
    )];
  }

  Future<List<Event>> _handleDecrement(CounterState state, Command command) async {
    final cmd = command as DecrementCommand;
    if (state.value - cmd.decrementBy < 0) {
      throw StateError('Counter cannot go negative');
    }
    return [CounterDecrementedEvent(
      counterId: cmd.counterId,
      decrementBy: cmd.decrementBy,
    )];
  }

  // Event handlers registered in map
  void _applyIncrement(Event event) {
    final evt = event as CounterIncrementedEvent;
    currentState.value += evt.incrementBy;
    currentState.version++;
    currentState.lastModified = evt.timestamp;
  }

  void _applyDecrement(Event event) {
    final evt = event as CounterDecrementedEvent;
    currentState.value -= evt.decrementBy;
    currentState.version++;
    currentState.lastModified = evt.timestamp;
  }
}

โœ… Advantages of Registry Pattern:

  • Concise: Less boilerplate for simple aggregates
  • Dynamic: Can register handlers at runtime if needed
  • Composable: Can share handlers across multiple aggregates

โŒ Disadvantages:

  • Runtime Errors: Missing handlers only discovered when command is sent
  • Less Type Safety: Requires manual casting in handler methods
  • Harder to Debug: No explicit list of supported commands/events
  • IDE Limitations: Jump-to-definition doesn't work as well

When to Use:

  • Simple aggregates with few commands/events (2-3)
  • Prototyping or rapid development
  • When handler logic is shared across aggregates
  • Educational examples or tutorials

Critical: Event Handler State Initialization

Regardless of which pattern you use, if you override eventHandler(), you MUST call ensureStateInitialized() at the start of your event handler. This ensures state is initialized during recovery when events are replayed.

@override
void eventHandler(Event event) {
  // CRITICAL: Call this before processing events
  ensureStateInitialized();
  
  // Then process your events...
  switch (event.runtimeType) {
    // ...
  }
}

Why this is required:

  • During recovery, Eventador replays events from storage to reconstruct state
  • The first event might arrive before state is initialized
  • Without this call, you'll get Null check operator used on a null value errors
  • The base class normally handles this, but overriding bypasses that logic

This is NOT required for the Registry Pattern - the base class handles initialization automatically when using the eventHandlers map.


Comparison Table

Feature Override Pattern Registry Pattern
Type Safety โœ… Strong โš ๏ธ Runtime only
IDE Support โœ… Excellent โš ๏ธ Limited
Boilerplate โš ๏ธ More verbose โœ… Concise
Debugging โœ… Easy โš ๏ธ Harder
Runtime Flexibility โŒ Static โœ… Dynamic
Production Ready โœ… Yes โš ๏ธ Simple cases only
State Init โš ๏ธ Manual ensureStateInitialized() โœ… Automatic

You can technically mix both patterns, but this is not recommended as it creates confusion:

// โŒ DON'T DO THIS - Pick one pattern and stick with it
class MixedAggregate extends AggregateRoot<MyState> {
  MixedAggregate(...) : super(...) {
    commandHandlers[SomeCommand] = _handleSome; // Registry pattern
  }
  
  @override
  Future<List<Event>> handleCommand(...) async {
    // Override pattern - this takes precedence over registry!
    // Your registered handlers in commandHandlers will be ignored
  }
}

Important: If you override handleCommand(), the commandHandlers map is ignored. Same for eventHandler() and eventHandlers map. The override always takes precedence.


Recommendation

For production applications, use the Override Pattern:

  • It's explicit, type-safe, and maintainable
  • Small amount of extra boilerplate is worth the safety
  • Your future self (and team members) will thank you

For simple examples or prototypes, Registry Pattern is fine:

  • Quick to write and understand
  • Good for educational purposes
  • Acceptable for aggregates with 2-3 commands

Aggregate Root Pattern

class BankAccountState extends State {
  final String accountId;
  final double balance;
  final bool isActive;

  BankAccountState({
    required this.accountId,
    required this.balance,
    required this.isActive,
    required super.version,
    required super.lastModified,
  });
}

class BankAccountAggregate extends AggregateRoot<BankAccountState> {
  BankAccountAggregate({
    required String persistenceId,
    required EventStore eventStore,
  }) : super(persistenceId: persistenceId, eventStore: eventStore);

  @override
  BankAccountState get initialState => BankAccountState(
    accountId: persistenceId,
    balance: 0.0,
    isActive: false,
    version: 0,
    lastModified: DateTime.now(),
  );

  @override
  List<Event> handleCommand(BankAccountState currentState, Command command) {
    if (command is OpenAccountCommand && !currentState.isActive) {
      return [AccountOpenedEvent(
        accountId: command.accountId,
        initialBalance: command.initialBalance,
      )];
    } else if (command is DepositMoneyCommand && currentState.isActive) {
      return [MoneyDepositedEvent(
        accountId: command.accountId,
        amount: command.amount,
      )];
    }
    return [];
  }

  @override
  BankAccountState applyEvent(BankAccountState currentState, Event event) {
    if (event is AccountOpenedEvent) {
      return currentState.copyWith(
        balance: event.initialBalance,
        isActive: true,
        version: currentState.version + 1,
      );
    } else if (event is MoneyDepositedEvent) {
      return currentState.copyWith(
        balance: currentState.balance + event.amount,
        version: currentState.version + 1,
      );
    }
    return currentState;
  }
}

Saga Pattern for Distributed Transactions

// Define saga state
class OrderSagaState extends SagaState {
  final String orderId;
  final String customerId;
  final double amount;
  final bool paymentProcessed;
  final bool orderShipped;

  OrderSagaState({
    required this.orderId,
    required this.customerId,
    required this.amount,
    required super.status,
    required super.lastUpdated,
    this.paymentProcessed = false,
    this.orderShipped = false,
  });

  @override
  OrderSagaState copyWith({
    SagaStatus? status,
    DateTime? lastUpdated,
    bool? paymentProcessed,
    bool? orderShipped,
  }) {
    return OrderSagaState(
      orderId: orderId,
      customerId: customerId,
      amount: amount,
      status: status ?? this.status,
      lastUpdated: lastUpdated ?? this.lastUpdated,
      paymentProcessed: paymentProcessed ?? this.paymentProcessed,
      orderShipped: orderShipped ?? this.orderShipped,
    );
  }
}

// Define saga implementation
class OrderSaga extends Saga {
  late OrderSagaState _state;

  OrderSaga({
    required String persistenceId,
    required EventStore eventStore,
    required QueueManager duraqManager,
  }) : super(
    persistenceId: persistenceId,
    eventStore: eventStore,
    duraqManager: duraqManager,
  );

  @override
  SagaState get sagaState => _state;

  @override
  Future<void> startSaga(Command initialCommand) async {
    if (initialCommand is CreateOrderCommand) {
      _state = OrderSagaState(
        orderId: initialCommand.orderId,
        customerId: initialCommand.customerId,
        amount: initialCommand.amount,
        status: SagaStatus.running,
        lastUpdated: DateTime.now(),
      );
      
      await saveSagaState();
      
      // Send command to payment processor using DuraQ
      await sendCommand('payment_processor', ProcessPaymentCommand(
        orderId: _state.orderId,
        amount: _state.amount,
      ));
    }
  }

  @override
  Future<void> handleSagaEvent(Event event) async {
    if (event is PaymentProcessedEvent) {
      _state = _state.copyWith(
        paymentProcessed: true,
        lastUpdated: DateTime.now(),
      );
      await saveSagaState();
      
      // Continue with shipping
      await sendCommand('shipping_processor', ShipOrderCommand(
        orderId: _state.orderId,
        address: '123 Main St', // In real implementation, get from order
      ));
    } else if (event is OrderShippedEvent) {
      _state = _state.copyWith(
        orderShipped: true,
        status: SagaStatus.completed,
        lastUpdated: DateTime.now(),
      );
      await saveSagaState();
      print('Order ${_state.orderId} completed successfully!');
    } else if (event is OrderFailedEvent) {
      _state = _state.copyWith(
        status: SagaStatus.failed,
        lastUpdated: DateTime.now(),
      );
      await saveSagaState();
      
      // Start compensation
      await compensate([event]);
    }
  }

  @override
  Future<void> compensate(List<Event> eventsToCompensate) async {
    _state = _state.copyWith(
      status: SagaStatus.compensating,
      lastUpdated: DateTime.now(),
    );
    await saveSagaState();
    
    // Implement compensation logic
    for (final event in eventsToCompensate.reversed) {
      if (event is PaymentProcessedEvent) {
        await sendCommand('payment_processor', RefundPaymentCommand(
          orderId: _state.orderId,
          amount: _state.amount,
        ));
      }
    }
  }

  @override
  Future<void> commandHandler(Command command) async {
    if (command is CreateOrderCommand) {
      await startSaga(command);
    }
    // Handle other saga-specific commands
  }

  @override
  void eventHandler(Event event) {
    // Route saga events to handleSagaEvent
    handleSagaEvent(event);
  }

  // Schedule timeout for saga steps
  Future<void> schedulePaymentTimeout() async {
    await scheduleTimeout(
      Duration(minutes: 5),
      'payment-timeout-${_state.orderId}',
    );
  }
}

### Saga Orchestration

The `SagaCoordinator` is a background processor responsible for dequeuing commands and timeouts from reliable queues and delivering them to the appropriate saga instances. It is not an actor and should not be spawned.

Here is a complete example of how to set up and run a saga, including the `SagaCoordinator`:

```dart
import 'dart:async';
import 'dart:io';
import 'package:dactor/dactor.dart';
import 'package:duraq/duraq.dart';
import 'package:eventador/eventador.dart';
import 'package:isar/isar.dart';

// ... (Saga, Command, and Event definitions from above)

Future<void> main() async {
  final tempDir = Directory.systemTemp.createTempSync('isar_test_').path;

  // Initialize Isar core
  await Isar.initializeIsarCore(download: true);

  // Create Isar instance with required schemas
  final isar = await Isar.open(
    [...IsarStorage.requiredSchemas, ...IsarEventStore.requiredSchemas],
    directory: tempDir,
    name: 'tmp_queue',
  );
  final storage = await IsarStorage(isar);
  final duraqManager = QueueManager(storage);

  // Initialize Eventador with external Isar instance
  final eventStore = IsarEventStore(isar);
  final actorSystem = LocalActorSystem();

  // The SagaCoordinator is a background processor, not an actor.
  final sagaCoordinator = SagaCoordinator(duraqManager, eventStore, actorSystem);
  unawaited(sagaCoordinator.processSagaCommands());
  unawaited(sagaCoordinator.processSagaTimeouts());

  // Spawn worker actors
  await actorSystem.spawn('payment_processor', () => PaymentProcessor());
  await actorSystem.spawn('shipping_processor', () => ShippingProcessor());

  // Start the saga
  final orderId = 'order_123';
  final createOrderCommand = CreateOrder(
    aggregateId: orderId,
    customerId: 'customer_456',
    amount: 99.99,
  );

  final saga = await actorSystem.spawn(
    orderId,
    () => OrderSaga(
      persistenceId: orderId,
      eventStore: eventStore,
      duraqManager: duraqManager,
    ),
  );

  print('Starting saga for order $orderId...');
  saga.tell(createOrderCommand);

  // Wait for the saga to complete
  await Future.delayed(const Duration(seconds: 5));

  // Clean up
  print('Shutting down...');
  await actorSystem.shutdown();
  await isar.close(deleteFromDisk: true);
  print('Example finished.');
}

// Example commands and events
class CreateOrderCommand extends Command {
  final String orderId;
  final String customerId;
  final double amount;

  CreateOrderCommand({
    required this.orderId,
    required this.customerId,
    required this.amount,
  });
}

class ProcessPaymentCommand extends Command {
  final String orderId;
  final double amount;

  ProcessPaymentCommand({
    required this.orderId,
    required this.amount,
  });
}

class ShipOrderCommand extends Command {
  final String orderId;
  final String address;

  ShipOrderCommand({
    required this.orderId,
    required this.address,
  });
}

class RefundPaymentCommand extends Command {
  final String orderId;
  final double amount;

  RefundPaymentCommand({
    required this.orderId,
    required this.amount,
  });
}

class PaymentProcessedEvent extends Event {
  final String orderId;
  final String transactionId;

  PaymentProcessedEvent({
    required this.orderId,
    required this.transactionId,
  });
}

class OrderShippedEvent extends Event {
  final String orderId;
  final String trackingNumber;

  OrderShippedEvent({
    required this.orderId,
    required this.trackingNumber,
  });
}

class OrderFailedEvent extends Event {
  final String orderId;
  final String reason;

  OrderFailedEvent({
    required this.orderId,
    required this.reason,
  });
}

Event Projections for Read Models

class UserStatisticsProjection extends Projection<UserStatistics> {
  UserStatistics _readModel = UserStatistics.empty();

  @override
  String get projectionId => 'user-statistics';

  @override
  UserStatistics get readModel => _readModel;

  @override
  List<Type> get interestedEventTypes => [
    UserRegisteredEvent,
    UserLoginEvent,
    UserLogoutEvent,
    UserProfileUpdatedEvent,
  ];

  @override
  Future<bool> handle(Event event) async {
    switch (event.runtimeType) {
      case UserRegisteredEvent:
        _handleUserRegistered(event as UserRegisteredEvent);
        return true;
      case UserLoginEvent:
        _handleUserLogin(event as UserLoginEvent);
        return true;
      case UserLogoutEvent:
        _handleUserLogout(event as UserLogoutEvent);
        return true;
      case UserProfileUpdatedEvent:
        _handleUserProfileUpdated(event as UserProfileUpdatedEvent);
        return true;
      default:
        return false;
    }
  }

  void _handleUserRegistered(UserRegisteredEvent event) {
    _readModel = _readModel.copyWith(
      totalUsers: _readModel.totalUsers + 1,
      newUsersToday: _readModel.newUsersToday + 1,
      lastUpdated: DateTime.now(),
    );
  }

  void _handleUserLogin(UserLoginEvent event) {
    _readModel = _readModel.copyWith(
      totalLogins: _readModel.totalLogins + 1,
      activeUsersToday: _readModel.activeUsersToday + 1,
      lastUpdated: DateTime.now(),
    );
  }

  void _handleUserLogout(UserLogoutEvent event) {
    // Calculate session duration
    final sessionDuration = event.timestamp.difference(event.loginTime);
    final avgDuration = _readModel.averageSessionDuration;
    final newAverage = avgDuration != null
        ? Duration(milliseconds: ((avgDuration.inMilliseconds + sessionDuration.inMilliseconds) / 2).round())
        : sessionDuration;
    
    _readModel = _readModel.copyWith(
      averageSessionDuration: newAverage,
      lastUpdated: DateTime.now(),
    );
  }

  void _handleUserProfileUpdated(UserProfileUpdatedEvent event) {
    _readModel = _readModel.copyWith(
      profileUpdatesToday: _readModel.profileUpdatesToday + 1,
      lastUpdated: DateTime.now(),
    );
  }

  @override
  Future<void> rebuild() async {
    _readModel = UserStatistics.empty();
    // In a real implementation, this would replay all events
  }

  @override
  Future<void> reset() async {
    _readModel = UserStatistics.empty();
  }

  @override
  Future<int> getCheckpoint() async {
    // This would be loaded from storage
    return 0;
  }

  @override
  Future<void> updateCheckpoint(int sequenceNumber) async {
    // This would be saved to storage
  }
}

// Read model for user statistics
class UserStatistics {
  final int totalUsers;
  final int newUsersToday;
  final int activeUsersToday;
  final int totalLogins;
  final int profileUpdatesToday;
  final Duration? averageSessionDuration;
  final DateTime lastUpdated;

  const UserStatistics({
    required this.totalUsers,
    required this.newUsersToday,
    required this.activeUsersToday,
    required this.totalLogins,
    required this.profileUpdatesToday,
    this.averageSessionDuration,
    required this.lastUpdated,
  });

  factory UserStatistics.empty() {
    return UserStatistics(
      totalUsers: 0,
      newUsersToday: 0,
      activeUsersToday: 0,
      totalLogins: 0,
      profileUpdatesToday: 0,
      lastUpdated: DateTime.now(),
    );
  }

  UserStatistics copyWith({
    int? totalUsers,
    int? newUsersToday,
    int? activeUsersToday,
    int? totalLogins,
    int? profileUpdatesToday,
    Duration? averageSessionDuration,
    DateTime? lastUpdated,
  }) {
    return UserStatistics(
      totalUsers: totalUsers ?? this.totalUsers,
      newUsersToday: newUsersToday ?? this.newUsersToday,
      activeUsersToday: activeUsersToday ?? this.activeUsersToday,
      totalLogins: totalLogins ?? this.totalLogins,
      profileUpdatesToday: profileUpdatesToday ?? this.profileUpdatesToday,
      averageSessionDuration: averageSessionDuration ?? this.averageSessionDuration,
      lastUpdated: lastUpdated ?? this.lastUpdated,
    );
  }

  Map<String, dynamic> toMap() {
    return {
      'totalUsers': totalUsers,
      'newUsersToday': newUsersToday,
      'activeUsersToday': activeUsersToday,
      'totalLogins': totalLogins,
      'profileUpdatesToday': profileUpdatesToday,
      'averageSessionDurationMs': averageSessionDuration?.inMilliseconds,
      'lastUpdated': lastUpdated.toIso8601String(),
    };
  }

  factory UserStatistics.fromMap(Map<String, dynamic> map) {
    return UserStatistics(
      totalUsers: map['totalUsers'] as int,
      newUsersToday: map['newUsersToday'] as int,
      activeUsersToday: map['activeUsersToday'] as int,
      totalLogins: map['totalLogins'] as int,
      profileUpdatesToday: map['profileUpdatesToday'] as int,
      averageSessionDuration: map['averageSessionDurationMs'] != null
          ? Duration(milliseconds: map['averageSessionDurationMs'] as int)
          : null,
      lastUpdated: DateTime.parse(map['lastUpdated'] as String),
    );
  }
}

// Example user events
class UserRegisteredEvent extends Event with AggregateEvent, SerializableEvent {
  final String userId;
  final String email;
  final String username;

  UserRegisteredEvent({
    required this.userId,
    required this.email,
    required this.username,
  });

  @override
  String get aggregateId => userId;

  @override
  String get aggregateType => 'User';

  @override
  Map<String, dynamic> getEventData() {
    return {
      'userId': userId,
      'email': email,
      'username': username,
    };
  }
}

class UserLoginEvent extends Event with AggregateEvent, SerializableEvent {
  final String userId;
  final String sessionId;

  UserLoginEvent({
    required this.userId,
    required this.sessionId,
  });

  @override
  String get aggregateId => userId;

  @override
  String get aggregateType => 'User';

  @override
  Map<String, dynamic> getEventData() {
    return {
      'userId': userId,
      'sessionId': sessionId,
    };
  }
}

class UserLogoutEvent extends Event with AggregateEvent, SerializableEvent {
  final String userId;
  final String sessionId;
  final DateTime loginTime;

  UserLogoutEvent({
    required this.userId,
    required this.sessionId,
    required this.loginTime,
  });

  @override
  String get aggregateId => userId;

  @override
  String get aggregateType => 'User';

  @override
  Map<String, dynamic> getEventData() {
    return {
      'userId': userId,
      'sessionId': sessionId,
      'loginTime': loginTime.toIso8601String(),
    };
  }
}

โš™๏ธ Configuration

Snapshot Configuration

// Development configuration
final devConfig = SnapshotConfig.development; // Snapshots every 50 events

// Production configuration  
final prodConfig = SnapshotConfig.production; // Snapshots every 200 events

// Custom configuration
final customConfig = SnapshotConfig(
  eventCountThreshold: 100,
  timeThreshold: Duration(minutes: 5),
  maxSnapshotsToKeep: 3,
  enableCompression: true,
  compressionThreshold: 2048,
  minTimeBetweenSnapshots: Duration(minutes: 1),
);

// Use with PersistentActor
final actor = MyPersistentActor(
  persistenceId: 'my-actor',
  eventStore: eventStore,
  snapshotConfig: customConfig,
);

External Isar Instance

For shared database scenarios:

// Create shared Isar instance
final isar = await Isar.open(
  [...IsarStorage.requiredSchemas, ...IsarEventStore.requiredSchemas],
  directory: '/path/to/shared/db',
  name: 'shared_database',
);

// Use with multiple components
final storage = IsarStorage(isar);
final eventStore = IsarEventStore(isar);
final duraqManager = QueueManager(storage);

๐Ÿ”‘ Event Type Registry (Critical)

โš ๏ธ IMPORTANT: Before using Eventador with CBOR serialization, you MUST register all your event types with the EventRegistry. This is required for event deserialization from storage after system restarts.

Why Event Registration is Required

When events are persisted to Isar using CBOR serialization, they are stored as binary data along with their type name. During recovery (e.g., after a system restart), Eventador needs to deserialize these events back into their original Dart objects. The EventRegistry provides the mapping from type names to factory functions that can reconstruct the events.

Without registration, you will get:

ArgumentError: Event type 'YourEvent' not registered

When to Register

Event types MUST be registered during application initialization, BEFORE:

  • Creating any AggregateRoot instances
  • Starting ProjectionManager
  • Spawning any PersistentActor instances

How to Register Events

import 'package:eventador/eventador.dart';

// During application startup (before any event sourcing operations)
void registerEventTypes() {
  // Register each event type with its type name and fromMap factory
  EventRegistry.register<AccountOpenedEvent>(
    'AccountOpenedEvent',
    (map) => AccountOpenedEvent.fromMap(map),
  );
  
  EventRegistry.register<MoneyDepositedEvent>(
    'MoneyDepositedEvent',
    (map) => MoneyDepositedEvent.fromMap(map),
  );
  
  EventRegistry.register<MoneyWithdrawnEvent>(
    'MoneyWithdrawnEvent',
    (map) => MoneyWithdrawnEvent.fromMap(map),
  );
  
  // ... register all other event types
}

// Call during app initialization
Future<void> main() async {
  // Initialize Isar
  await Isar.initializeIsarCore(download: true);
  
  // Register event types FIRST
  registerEventTypes();
  
  // Then initialize event store and actor system
  final eventStore = await IsarEventStore.create(directory: './data');
  final actorSystem = LocalActorSystem();
  
  // Now safe to spawn persistent actors
  // ...
}

Event fromMap Requirements

Each event class must implement a fromMap factory method that can reconstruct the event from a Map:

class AccountOpenedEvent extends Event with AggregateEvent, SerializableEvent {
  final String accountId;
  final double initialBalance;
  
  AccountOpenedEvent({
    required this.accountId,
    required this.initialBalance,
    String? eventId,
    DateTime? timestamp,
    int? version,
  }) : super(
    eventId: eventId ?? Uuid().v4(),
    timestamp: timestamp ?? DateTime.now(),
    version: version ?? 1,
  );
  
  @override
  String get aggregateId => accountId;
  
  @override
  String get aggregateType => 'BankAccount';
  
  @override
  Map<String, dynamic> getEventData() {
    return {
      'accountId': accountId,
      'initialBalance': initialBalance,
    };
  }
  
  // REQUIRED: Factory method for deserialization
  static AccountOpenedEvent fromMap(Map<String, dynamic> map) {
    return AccountOpenedEvent(
      accountId: map['accountId'] as String,
      initialBalance: map['initialBalance'] as double,
      eventId: map['eventId'] as String?,
      timestamp: map['timestamp'] != null 
          ? (map['timestamp'] is String
              ? DateTime.parse(map['timestamp'] as String)
              : map['timestamp'] as DateTime)
          : null,
      version: map['version'] as int?,
    );
  }
}

Complete Example with Registration

import 'package:eventador/eventador.dart';
import 'package:isar/isar.dart';

// Step 1: Define events with fromMap factories
class WalletCreatedEvent extends Event with AggregateEvent, SerializableEvent {
  final String walletId;
  final String name;
  
  WalletCreatedEvent({
    required this.walletId,
    required this.name,
    String? eventId,
    DateTime? timestamp,
  }) : super(
    eventId: eventId ?? Uuid().v4(),
    timestamp: timestamp ?? DateTime.now(),
  );
  
  @override
  String get aggregateId => walletId;
  
  @override
  String get aggregateType => 'Wallet';
  
  @override
  Map<String, dynamic> getEventData() => {
    'walletId': walletId,
    'name': name,
  };
  
  static WalletCreatedEvent fromMap(Map<String, dynamic> map) {
    return WalletCreatedEvent(
      walletId: map['walletId'] as String,
      name: map['name'] as String,
      eventId: map['eventId'] as String?,
      timestamp: map['timestamp'] != null
          ? (map['timestamp'] is String
              ? DateTime.parse(map['timestamp'])
              : map['timestamp'] as DateTime)
          : null,
    );
  }
}

// Step 2: Register all event types during initialization
void registerEventTypes() {
  print('Registering event types for deserialization...');
  
  EventRegistry.register<WalletCreatedEvent>(
    'WalletCreatedEvent',
    (map) => WalletCreatedEvent.fromMap(map),
  );
  
  // Register ALL your event types here
  
  print('โœ“ Registered ${EventRegistry.registeredTypes.length} event types');
}

// Step 3: Initialize in correct order
Future<void> main() async {
  // 1. Initialize Isar
  await Isar.initializeIsarCore(download: true);
  final isar = await Isar.open([...IsarEventStore.requiredSchemas]);
  
  // 2. Register event types (BEFORE EventStore operations)
  registerEventTypes();
  
  // 3. Create EventStore
  final eventStore = IsarEventStore(isar);
  
  // 4. Create and start ProjectionManager (if using projections)
  final projectionManager = ProjectionManager(eventStore);
  // ... register projections ...
  await projectionManager.start();
  
  // 5. Spawn aggregates and persistent actors
  final actorSystem = LocalActorSystem();
  // ... spawn actors ...
  
  print('System initialized successfully');
}

DateTime Field Handling

Important: When deserializing events from CBOR storage, DateTime fields may be deserialized as either String (ISO-8601) or DateTime objects depending on the serialization layer. Always handle both cases in your fromMap methods:

static MyEvent fromMap(Map<String, dynamic> map) {
  return MyEvent(
    // Handle DateTime fields that might be String or DateTime
    createdAt: map['createdAt'] != null
        ? (map['createdAt'] is String
            ? DateTime.parse(map['createdAt'] as String)
            : map['createdAt'] as DateTime)
        : null,
    // ... other fields
  );
}

Best Practices

  1. Centralize Registration: Create a single registerEventTypes() function that registers all event types in your application
  2. Register Early: Call registration during app initialization, before any event sourcing operations
  3. Type Name Consistency: Use the exact runtime type name (e.g., 'WalletCreatedEvent') as the registration key
  4. Complete Coverage: Register ALL event types used by your aggregates and projections
  5. Version Support: If using event versioning, register all versions of each event type

Troubleshooting

Error: Event type 'XYZ' not registered

  • Solution: Add EventRegistry.register<XYZ>('XYZ', (map) => XYZ.fromMap(map)) during initialization

Error: type 'String' is not a subtype of type 'DateTime'

  • Solution: Update your fromMap to handle DateTime fields as both String and DateTime (see example above)

Events deserialize with null fields after restart

  • Solution: Ensure your fromMap method reads from the correct map keys returned by getEventData()

๐Ÿš€ Performance

Eventador is designed for high performance:

  • Write Throughput: >10,000 events/second
  • Read Throughput: >50,000 events/second
  • Recovery Time: <100ms for 1,000 events
  • Memory Overhead: <5KB per persistent actor
  • Storage Efficiency: <500 bytes per event average

Performance Tips

  1. Use Snapshots: Configure appropriate snapshot thresholds for your use case
  2. Batch Operations: Use persistEvents() for multiple events
  3. Optimize Serialization: Keep event payloads small and efficient
  4. Monitor Memory: Use snapshot cleanup to prevent unbounded growth
  5. Index Strategy: Leverage Isar's indexing for fast queries

๐Ÿงช Testing

Eventador provides comprehensive testing utilities:

import 'package:eventador/eventador.dart';
import 'package:test/test.dart';

void main() {
  group('CounterActor Tests', () {
    late EventStore eventStore;
    late CounterActor counter;

    setUp(() async {
      eventStore = await IsarEventStore.create(directory: null); // In-memory
      counter = CounterActor(
        persistenceId: 'test-counter',
        eventStore: eventStore,
      );
      counter.preStart();
      await Future.delayed(Duration(milliseconds: 50));
    });

    tearDown(() async {
      await eventStore.close();
    });

    test('should increment counter', () async {
      await counter.onMessage(IncrementCommand(aggregateId: 'test-counter', value: 5));
      expect(counter.value, equals(5));
    });

    test('should recover state after restart', () async {
      // Add some events
      await counter.onMessage(IncrementCommand(aggregateId: 'test-counter', value: 10));
      
      // Create new instance (simulates restart)
      final recoveredCounter = CounterActor(
        persistenceId: 'test-counter',
        eventStore: eventStore,
      );
      recoveredCounter.preStart();
      await Future.delayed(Duration(milliseconds: 50));
      
      expect(recoveredCounter.value, equals(10));
    });
  });
}

๐Ÿ“– Examples

The example/ directory contains comprehensive examples:

  • Basic Counter: Simple persistent actor with snapshots
  • Order Processing Saga: Complete saga implementation with DuraQ integration
  • Banking System: Account aggregates with business rules
  • E-commerce Platform: Order processing with projections
  • Chat System: Message persistence with read models

๐Ÿ› ๏ธ Development

Building

# Get dependencies
dart pub get

# Generate code (for Isar schemas)
dart run build_runner build

# Run tests
dart test

# Run example
dart run example/eventador_example.dart

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Run dart test and dart analyze
  6. Submit a pull request

๐Ÿ“‹ Roadmap

โœ… Completed (Phase 1-3)

  • x Core persistent actor framework
  • x Isar event store with CBOR serialization
  • x Event sourcing patterns (Command/Event/State)
  • x Aggregate root implementation
  • x Snapshot system with configurable policies
  • x Saga pattern with DuraQ integration
  • x Event projections and read models

๐Ÿšง In Progress (Phase 4)

  • Performance optimizations and batching
  • Comprehensive monitoring and metrics
  • Migration tools and utilities
  • Production-ready documentation

๐Ÿ”ฎ Future

  • Multi-tenant support
  • Event store clustering
  • GraphQL integration for read models
  • Real-time event streaming
  • Advanced debugging tools

๐Ÿค Ecosystem

Eventador is part of a larger ecosystem:

  • Dactor: Actor model foundation
  • DuraQ: Reliable queue system
  • Isar: High-performance database

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿ™ Acknowledgments

  • Isar Team: For the excellent database foundation
  • Dactor Community: For the actor model inspiration
  • DuraQ Contributors: For reliable queue operations
  • Event Sourcing Community: For patterns and best practices

๐Ÿ“ž Support


Built with โค๏ธ for the Dart and Flutter community

Libraries

eventador
Eventador - Persistence & Event Sourcing Extension for Dactor