Continuum

A flexible event sourcing and domain event modeling framework for Dart and Flutter.

Philosophy

Continuum is built around domain events as a modeling tool. Events describe meaningful state transitions, and aggregates define how those transitions affect domain state. Unlike traditional event sourcing frameworks, Continuum supports multiple usage modes depending on where your source of truth lives.

Three Usage Modes

Mode 1: Event-Driven Mutation (No Persistence)

Use events as typed, explicit state transitions with aggregate validation. Only final state is persisted (CRUD style). Events are not stored or replayed.

Use when:

  • Building clean domain models with strong invariants
  • You want explicit mutations without event sourcing overhead
  • Backend uses traditional CRUD persistence

Mode 2: Frontend-Only Event Sourcing

The frontend is the source of truth. Events are persisted locally (SQLite, Hive, etc.) and aggregates are reconstructed by replaying events.

Use when:

  • Building offline-first applications
  • Single-user desktop tools
  • No backend or backend is just for sync/backup

Mode 3: Hybrid Mode (Backend as Source of Truth)

Backend is authoritative, frontend uses events for optimistic UI. Frontend events are transient and discarded after backend confirms. The backend may use its own event sourcing or CRUD—your frontend doesn't care.

Use when:

  • Building responsive UIs with optimistic updates
  • Need undo/cancel before committing
  • Backend handles validation and persistence

Quick Start

Installation

Add to your pubspec.yaml:

dependencies:
  continuum: latest

dev_dependencies:
  build_runner: ^2.4.0
  continuum_generator: latest

1. Define Your Aggregate

import 'package:continuum/continuum.dart';

part 'user.g.dart';

@Aggregate()
class User with _$UserEventHandlers {
  final String id;
  String name;
  String email;

  User._({required this.id, required this.name, required this.email});

  // Static factory for creating from first event
  static User createFromUserRegistered(UserRegistered event) {
    return User._(
      id: event.userId,
      name: event.name,
      email: event.email,
    );
  }

  // Apply methods define state transitions (override generated mixin)
  @override
  void applyEmailChanged(EmailChanged event) {
    email = event.newEmail;
  }

  @override
  void applyNameChanged(NameChanged event) {
    name = event.newName;
  }
}

2. Define Your Events

import 'package:continuum/continuum.dart';
import 'package:zooper_flutter_core/zooper_flutter_core.dart';

// For Mode 1 (no persistence) you can omit `type:` and serialization.
@AggregateEvent(of: User)
class EmailChanged implements ContinuumEvent {
  EmailChanged({
    required this.userId,
    required this.newEmail,
    EventId? eventId,
    DateTime? occurredOn,
    Map<String, Object?> metadata = const {},
  }) : id = eventId ?? EventId.fromUlid(),
       occurredOn = occurredOn ?? DateTime.now(),
       metadata = Map<String, Object?>.unmodifiable(metadata);

  final String userId;
  final String newEmail;

  @override
  final EventId id;

  @override
  final DateTime occurredOn;

  @override
  final Map<String, Object?> metadata;
}

// For Mode 2/3 (with persistence), add `type:` and `toJson`/`fromJson`.
@AggregateEvent(of: User, type: 'user.registered')
class UserRegistered implements ContinuumEvent {
  UserRegistered({
    required this.userId,
    required this.name,
    required this.email,
    EventId? eventId,
    DateTime? occurredOn,
    Map<String, Object?> metadata = const {},
  }) : id = eventId ?? EventId.fromUlid(),
       occurredOn = occurredOn ?? DateTime.now(),
       metadata = Map<String, Object?>.unmodifiable(metadata);

  final String userId;
  final String name;
  final String email;

  @override
  final EventId id;

  @override
  final DateTime occurredOn;

  @override
  final Map<String, Object?> metadata;

  factory UserRegistered.fromJson(Map<String, dynamic> json) {
    return UserRegistered(
      userId: json['userId'] as String,
      name: json['name'] as String,
      email: json['email'] as String,
      eventId: EventId(json['eventId'] as String),
      occurredOn: DateTime.parse(json['occurredOn'] as String),
      metadata: Map<String, Object?>.from(json['metadata'] as Map),
    );
  }

  Map<String, dynamic> toJson() => {
    'userId': userId,
    'name': name,
    'email': email,
    'eventId': id.toString(),
    'occurredOn': occurredOn.toIso8601String(),
    'metadata': metadata,
  };
}

3. Generate Code

dart run build_runner build

This creates:

  • user.g.dart with event handling mixin
  • lib/continuum.g.dart with $aggregateList (auto-discovered!)

4. Use Your Aggregate

Mode 1: Simple State Transitions

void main() {
  final userId = StreamId('123');

  // Create from a creation event
  final user = User.createFromUserRegistered(
    UserRegistered(userId: userId.value, name: 'Alice', email: 'alice@example.com'),
  );

  // Apply events to mutate state
  user.applyEvent(EmailChanged(userId: userId.value, newEmail: 'alice@company.com'));

  print(user.email); // alice@company.com

  // Save final state to your database (events not persisted)
}

Mode 2: Frontend Event Sourcing

import 'package:continuum_store_memory/continuum_store_memory.dart';

void main() async {
  // Setup (zero configuration - $aggregateList auto-discovered!)
  final store = EventSourcingStore(
    eventStore: InMemoryEventStore(),
    aggregates: $aggregateList, // Generated automatically!
  );

  final userId = StreamId('user-123');

  // Create + mutate within a session
  final session = store.openSession();
  session.startStream<User>(
    userId,
    UserRegistered(userId: userId.value, name: 'Alice', email: 'alice@example.com'),
  );
  session.append(userId, EmailChanged(userId: userId.value, newEmail: 'alice@company.com'));
  await session.saveChangesAsync();

  // Load aggregate (reconstructed from events)
  final readSession = store.openSession();
  final user = await readSession.loadAsync<User>(userId);
  print(user.email); // alice@company.com
}

Mode 3: Hybrid with Backend

void main() async {
  // Backend is source of truth.
  // On the frontend, keep transient domain events for optimistic UI.

  final userId = StreamId('user-123');
  final user = await backendApi.fetchUser(userId);

  // User edits email in UI (optimistic)
  final pendingEvents = <ContinuumEvent>[];
  final emailChanged = EmailChanged(userId: userId.value, newEmail: 'new@email.com');
  pendingEvents.add(emailChanged);
  user.applyEvent(emailChanged);

  updateUI(user); // Show immediately

  // Convert to a request DTO and send to backend
  final dto = {'email': user.email};
  final confirmed = await backendApi.updateUser(userId, dto);

  // Discard local events; replace with backend response
  pendingEvents.clear();
  displayUser(User.fromBackend(confirmed));
}

See hybrid_mode_example.dart for a complete example.

Core Concepts

Aggregates

Aggregates are domain objects that encapsulate business logic and invariants. They transition between states by applying events.

@Aggregate()
class Order with _$OrderEventHandlers {
  final String id;
  final List<String> items;
  final OrderStatus status;
  
  // Constructor, factories, and apply methods...
}

Events

Events represent things that have happened. They are immutable and describe state changes.

import 'package:continuum/continuum.dart';
import 'package:zooper_flutter_core/zooper_flutter_core.dart';

@AggregateEvent(of: Order, type: 'order.item_added') // type required for persistence
class ItemAdded implements ContinuumEvent {
  final String itemId;

  ItemAdded({
    required this.itemId,
    EventId? eventId,
    DateTime? occurredOn,
    Map<String, Object?> metadata = const {},
  }) : id = eventId ?? EventId.fromUlid(),
       occurredOn = occurredOn ?? DateTime.now(),
       metadata = Map<String, Object?>.unmodifiable(metadata);

  @override
  final EventId id;

  @override
  final DateTime occurredOn;

  @override
  final Map<String, Object?> metadata;
}

Sessions

Sessions track pending events and manage aggregate versions. Call saveChangesAsync() to commit events atomically.

final session = store.openSession();

session.startStream<Order>(
  orderId,
  OrderCreated(orderId: orderId.value, customerId: customerId),
);
session.append(orderId, ItemAdded(itemId: 'item-1'));
session.append(orderId, ItemAdded(itemId: 'item-2'));

await session.saveChangesAsync(); // All or nothing

Event Sourcing Store

The EventSourcingStore is your configuration root. It automatically merges all aggregate registries.

final store = EventSourcingStore(
  eventStore: InMemoryEventStore(), // or HiveEventStore
  aggregates: $aggregateList, // Auto-discovered - just run build_runner!
);

Code Generation

Continuum uses code generation to eliminate boilerplate. When you run build_runner, it generates:

  1. Per-aggregate files (user.g.dart):

    • _$UserEventHandlers mixin with event dispatcher
    • applyEvent() extension method
    • replayEvents() for reconstruction
    • createFromEvent() factory
    • Event serialization registry
  2. Global file (lib/continuum.g.dart):

    • $aggregateList with all aggregates in your project
    • Auto-discovered from @Aggregate() annotations

Build Configuration

Add to build.yaml (optional, for customization):

targets:
  $default:
    builders:
      continuum_generator:
        enabled: true

Continuum can optionally surface common mistakes immediately in the editor using a custom lint plugin.

Why use it?

Some Continuum patterns rely on generated mixins (for example _$UserEventHandlers). If a concrete @Aggregate() class forgets to implement one of the required apply<Event>(...) handlers, Dart can sometimes delay the failure until runtime (or until the class is instantiated, depending on how the type is used).

The continuum_lints package detects this situation early and reports it as a diagnostic while you type.

Setup

Add these dev dependencies:

dev_dependencies:
  custom_lint: ^0.8.1
  continuum_lints: ^3.1.1

Enable the analyzer plugin in your analysis_options.yaml:

analyzer:
  plugins:
    - custom_lint

Optionally, configure which rules are enabled (recommended to keep things explicit):

custom_lint:
  enable_all_lint_rules: false
  rules:
    - continuum_missing_apply_handlers        # For @Aggregate classes
    - continuum_missing_projection_handlers   # For @Projection classes

CI usage

dart analyze does not run custom lints. In CI, run:

dart run custom_lint

Working with Persistence

Event Stores

Continuum provides pluggable event storage:

  • continuum_store_memory: In-memory (testing/development)
  • continuum_store_hive: Local Hive persistence (production)
  • Custom: Implement EventStore interface for your own backend

Optimistic Concurrency

Prevent conflicting writes with version checks:

try {
  await session.saveChangesAsync();
} on ConcurrencyException catch (e) {
  // Handle conflict: reload and retry, or show error to user
  print('Conflict: expected ${e.expectedVersion}, got ${e.actualVersion}');
}

Event Serialization

Events are serialized to JSON for storage. Implement toJson() and fromJson():

@AggregateEvent(of: User, type: 'user.email_changed')
import 'package:continuum/continuum.dart';
import 'package:zooper_flutter_core/zooper_flutter_core.dart';

@AggregateEvent(of: User, type: 'user.email_changed')
class EmailChanged implements ContinuumEvent {
  EmailChanged({
    required this.userId,
    required this.newEmail,
    EventId? eventId,
    DateTime? occurredOn,
    Map<String, Object?> metadata = const {},
  }) : id = eventId ?? EventId.fromUlid(),
       occurredOn = occurredOn ?? DateTime.now(),
       metadata = Map<String, Object?>.unmodifiable(metadata);

  final String userId;
  final String newEmail;

  @override
  final EventId id;

  @override
  final DateTime occurredOn;

  @override
  final Map<String, Object?> metadata;

  factory EmailChanged.fromJson(Map<String, dynamic> json) {
    return EmailChanged(
      userId: json['userId'] as String,
      newEmail: json['newEmail'] as String,
      eventId: EventId(json['eventId'] as String),
      occurredOn: DateTime.parse(json['occurredOn'] as String),
      metadata: Map<String, Object?>.from(json['metadata'] as Map),
    );
  }

  Map<String, dynamic> toJson() => {
    'userId': userId,
    'newEmail': newEmail,
    'eventId': id.toString(),
    'occurredOn': occurredOn.toIso8601String(),
    'metadata': metadata,
  };
}

The of links the event to its aggregate. The type string identifies the event type in storage—make it unique and stable.

Examples

Projections (Read Models)

Projections maintain read models that are automatically updated when events occur. This enables CQRS (Command Query Responsibility Segregation) patterns where you have optimized read views separate from your event-sourced aggregates.

Why Use Projections?

  1. Query Efficiency: Read models are optimized for specific queries without reconstructing aggregates.
  2. Denormalization: Combine data from multiple aggregates into a single view.
  3. Performance: Avoid replaying events for every read operation.

Quick Start with Code Generation

Like aggregates, projections use code generation to eliminate boilerplate:

import 'package:continuum/continuum.dart';

part 'user_profile_projection.g.dart';

/// Read model for a user's profile information.
class UserProfile {
  final String name;
  final String email;
  final bool isActive;
  final DateTime lastUpdated;

  const UserProfile({
    required this.name,
    required this.email,
    required this.isActive,
    required this.lastUpdated,
  });

  UserProfile copyWith({String? name, String? email, bool? isActive, DateTime? lastUpdated}) {
    return UserProfile(
      name: name ?? this.name,
      email: email ?? this.email,
      isActive: isActive ?? this.isActive,
      lastUpdated: lastUpdated ?? this.lastUpdated,
    );
  }
}

/// Projection that maintains UserProfile read models.
@Projection(
  name: 'user-profile',
  events: [UserRegistered, EmailChanged, UserDeactivated],
)
class UserProfileProjection
    extends SingleStreamProjection<UserProfile>
    with _$UserProfileProjectionHandlers {

  @override
  UserProfile createInitial(StreamId streamId) => UserProfile(
        name: '',
        email: '',
        isActive: true,
        lastUpdated: DateTime.utc(1970),
      );

  @override
  UserProfile applyUserRegistered(UserProfile current, UserRegistered event) {
    return UserProfile(
      name: event.name,
      email: event.email,
      isActive: true,
      lastUpdated: event.occurredOn,
    );
  }

  @override
  UserProfile applyEmailChanged(UserProfile current, EmailChanged event) {
    return current.copyWith(
      email: event.newEmail,
      lastUpdated: event.occurredOn,
    );
  }

  @override
  UserProfile applyUserDeactivated(UserProfile current, UserDeactivated event) {
    return current.copyWith(
      isActive: false,
      lastUpdated: event.occurredOn,
    );
  }
}

The @Projection annotation declares:

  • name: Unique identifier for position tracking (use stable names)
  • events: List of event types this projection handles

The generator creates:

  • _$UserProfileProjectionHandlers mixin with handledEventTypes, projectionName, and apply()
  • Abstract apply<EventName>() methods for each event type
  • $UserProfileProjection bundle for registration
  • $projectionList in lib/continuum.g.dart with all projections

Multi-Stream Projections

Aggregate data across multiple streams (e.g., statistics, dashboards):

/// Read model for system-wide user statistics.
class UserStatistics {
  final int totalUsers;
  final int activeUsers;

  const UserStatistics({required this.totalUsers, required this.activeUsers});
}

/// Projection that tracks statistics across all user streams.
@Projection(
  name: 'user-statistics',
  events: [UserRegistered, UserDeactivated, UserReactivated],
)
class UserStatisticsProjection
    extends MultiStreamProjection<UserStatistics, String>
    with _$UserStatisticsProjectionHandlers {

  // Multi-stream projections require custom key extraction
  @override
  String extractKey(StoredEvent event) => 'global';

  @override
  UserStatistics createInitial(String key) =>
      const UserStatistics(totalUsers: 0, activeUsers: 0);

  @override
  UserStatistics applyUserRegistered(UserStatistics current, UserRegistered event) {
    return UserStatistics(
      totalUsers: current.totalUsers + 1,
      activeUsers: current.activeUsers + 1,
    );
  }

  @override
  UserStatistics applyUserDeactivated(UserStatistics current, UserDeactivated event) {
    return UserStatistics(
      totalUsers: current.totalUsers,
      activeUsers: current.activeUsers - 1,
    );
  }

  @override
  UserStatistics applyUserReactivated(UserStatistics current, UserReactivated event) {
    return UserStatistics(
      totalUsers: current.totalUsers,
      activeUsers: current.activeUsers + 1,
    );
  }
}

Projection Lifecycle

Projections support two execution modes:

Inline (Strongly Consistent)

Projections execute during saveChangesAsync(). The read model is always up-to-date with the event store.

final registry = ProjectionRegistry();
final userProfileStore = InMemoryReadModelStore<UserProfile, StreamId>();

registry.registerInline(
  UserProfileProjection(),
  userProfileStore,
);

// Pass to EventSourcingStore
final store = EventSourcingStore(
  eventStore: InMemoryEventStore(),
  aggregates: $aggregateList,
  projections: registry, // Inline projections execute automatically
);

Async (Eventually Consistent)

Projections execute in the background after events are appended. Better for high-throughput scenarios.

final registry = ProjectionRegistry();
final statisticsStore = InMemoryReadModelStore<UserStatistics, String>();

registry.registerAsync(
  UserStatisticsProjection(),
  statisticsStore,
);

// Create background processor
final processor = PollingProjectionProcessor(
  eventStore: eventStore, // Must implement ProjectionEventStore
  executor: AsyncProjectionExecutor(registry: registry),
  positionStore: InMemoryProjectionPositionStore(),
  pollInterval: Duration(seconds: 1),
);

// Start processing
await processor.startAsync();

// ... later
await processor.stopAsync();

Schema Change Detection

When you modify a projection's event list, the generated schemaHash changes. On startup:

  1. The system compares stored schema hash with current
  2. If different, the projection is marked stale and rebuilds from scratch
  3. Read models may return with isStale: true during rebuild

This ensures projections always reflect their current event definitions.

Read Model Storage

Projections store their state in ReadModelStore implementations:

// In-memory storage (for testing or volatile read models)
final store = InMemoryReadModelStore<UserProfile, StreamId>();

// Load a read model
final profile = await store.loadAsync(userId);

// Custom storage - implement the interface
class PostgresReadModelStore<T, K> implements ReadModelStore<T, K> {
  @override
  Future<T?> loadAsync(K key) async { /* ... */ }

  @override
  Future<void> saveAsync(K key, T readModel) async { /* ... */ }

  @override
  Future<void> deleteAsync(K key) async { /* ... */ }
}

Lint Support

The continuum_lints package provides editor-time checks for projections:

custom_lint:
  rules:
    - continuum_missing_projection_handlers

This rule detects missing apply<EventName> implementations in @Projection classes and offers quick-fixes to generate stubs.

Contributing

See the repository for contribution guidelines.

Libraries

continuum
Continuum - An event sourcing library for Dart.