gossip_event_sourcing 1.0.4 copy "gossip_event_sourcing: ^1.0.4" to clipboard
gossip_event_sourcing: ^1.0.4 copied to clipboard

Event Sourcing and CQRS library for Dart applications using the Gossip protocol. Provides core components for building scalable event-driven architectures.

Gossip Event Sourcing #

A framework-agnostic Dart library for implementing Event Sourcing and CQRS (Command Query Responsibility Segregation) patterns in distributed applications.

Features #

  • 🚀 Event Processing: Coordinate event processing through multiple projections
  • 📊 Projections: Build and maintain read models from events
  • 💾 Projection Store: Optional persistent storage for projection states (dramatically improves startup performance)
  • 🔄 State Restoration: Automatic state restoration with graceful fallbacks
  • 🛡️ Version Compatibility: Handle projection schema changes safely
  • 🎯 Framework Agnostic: Works with any Dart application, not just Flutter
  • 📈 High Performance: Optimized for large numbers of events

Installation #

Add this to your pubspec.yaml:

dependencies:
  gossip_event_sourcing:
    path: ../gossip_event_sourcing  # Update path as needed

Quick Start #

1. Create a Projection #

import 'package:gossip_event_sourcing/gossip_event_sourcing.dart';

class CounterProjection extends Projection with ProjectionChangeNotifier {
  int _count = 0;
  
  int get count => _count;

  @override
  Future<void> apply(Event event) async {
    if (event.payload['type'] == 'increment') {
      _count++;
      notifyListeners();
    } else if (event.payload['type'] == 'decrement') {
      _count--;
      notifyListeners();
    }
  }

  @override
  Future<void> reset() async {
    _count = 0;
    notifyListeners();
  }

  @override
  Map<String, dynamic> getState() {
    return {'count': _count};
  }

  @override
  Future<bool> restoreState(Map<String, dynamic> state) async {
    try {
      _count = state['count'] as int;
      notifyListeners();
      return true;
    } catch (e) {
      return false;
    }
  }
}

2. Create a Projection Store (Optional) #

class MyProjectionStore implements ProjectionStore {
  final Map<String, ProjectionStateSnapshot> _states = {};

  @override
  Future<void> initialize() async {
    // Initialize your storage backend
  }

  @override
  Future<void> saveProjectionState(
    String projectionType,
    Map<String, dynamic> state,
    String? lastProcessedEventId,
    int eventCount,
  ) async {
    _states[projectionType] = ProjectionStateSnapshot(
      projectionType: projectionType,
      state: state,
      lastProcessedEventId: lastProcessedEventId,
      eventCount: eventCount,
      savedAt: DateTime.now(),
      version: '1.0.0',
    );
  }

  @override
  Future<ProjectionStateSnapshot?> loadProjectionState(String projectionType) async {
    return _states[projectionType];
  }

  // Implement other methods...
}

3. Set Up Event Processing #

void main() async {
  // Optional: Create projection store for better performance
  final projectionStore = MyProjectionStore();
  await projectionStore.initialize();

  // Create event processor
  final eventProcessor = EventProcessor(
    projectionStore: projectionStore,
    storeConfig: const ProjectionStoreConfig(),
    logger: print, // Optional: for debugging
  );

  // Register projections
  final counterProjection = CounterProjection();
  eventProcessor.registerProjection(counterProjection);

  // Listen to projection changes
  counterProjection.addListener(() {
    print('Counter is now: ${counterProjection.count}');
  });

  // Process events
  await eventProcessor.processEvent(MyEvent(
    id: '1',
    payload: {'type': 'increment'},
  ));
}

Architecture #

Event Sourcing Flow #

Events → EventProcessor → Projections → UI/API
    ↓
ProjectionStore (optional)
  1. Events are processed through the EventProcessor
  2. Projections build read models by applying events
  3. ProjectionStore optionally saves projection states for fast startup
  4. UI/API reads from projections for queries

Performance Optimization #

Without projection store:

App Start → Load All Events → Replay All Events → Ready

With projection store:

App Start → Load Saved States → Process Recent Events → Ready

Configuration #

ProjectionStoreConfig Options #

// Default (recommended)
const ProjectionStoreConfig()

// High performance (less frequent saves)
const ProjectionStoreConfig.highPerformance()

// Maximum durability (frequent saves)
const ProjectionStoreConfig.maxDurability()

// Disabled (no projection store)
const ProjectionStoreConfig.disabled()

Custom Configuration #

const ProjectionStoreConfig(
  autoSaveEnabled: true,
  autoSaveInterval: 100,     // Save every 100 events
  saveAfterBatch: true,      // Save after processing batches
  loadOnRebuild: true,       // Try loading on startup
)

Best Practices #

1. Projection Versioning #

Always increment stateVersion when changing your projection's state format:

class MyProjection extends Projection {
  @override
  String get stateVersion => '2.0.0'; // Increment when breaking changes occur
  
  @override
  bool isStateCompatible(String savedVersion) {
    // Custom compatibility logic if needed
    return savedVersion == stateVersion;
  }
}

2. Error Handling #

Make your restoreState method robust:

@override
Future<bool> restoreState(Map<String, dynamic> state) async {
  try {
    // Restore state logic
    return true;
  } catch (e) {
    // Log error and reset to clean state
    await reset();
    return false; // Falls back to event replay
  }
}

3. Testing #

Test projections with and without saved states:

test('projection works with saved state', () async {
  final projection = MyProjection();
  
  // Test normal event processing
  await projection.apply(event);
  final state = projection.getState();
  
  // Test state restoration
  final newProjection = MyProjection();
  final restored = await newProjection.restoreState(state);
  
  expect(restored, isTrue);
  expect(newProjection.getState(), equals(state));
});

Framework Integration #

Flutter Integration #

// In your Flutter app
class MyProjection extends Projection with ProjectionChangeNotifier {
  // Your projection implementation
}

// Use with Provider or similar state management
ChangeNotifierProvider(
  create: (_) => counterProjection,
  child: MyApp(),
)

Console Application #

// In a console app
void main() async {
  final eventProcessor = EventProcessor(
    logger: (message) => print('[EventProcessor] $message'),
  );
  
  // Process events from any source
  await eventProcessor.processEvents(eventsFromDatabase);
}

Advanced Usage #

Custom Event Types #

Implement your own Event interface:

class MyEvent implements Event {
  @override
  final String id;
  
  @override
  final String nodeId;
  
  @override
  final int timestamp;
  
  @override
  final int creationTimestamp;
  
  @override
  final Map<String, dynamic> payload;
  
  MyEvent({
    required this.id,
    required this.nodeId,
    required this.timestamp,
    required this.creationTimestamp,
    required this.payload,
  });
}

Multiple Projections #

Register multiple projections for different views of the same data:

eventProcessor.registerProjection(UserListProjection());
eventProcessor.registerProjection(UserStatsProjection());
eventProcessor.registerProjection(AdminDashboardProjection());

Monitoring #

Get insights into your event processing:

final stats = eventProcessor.getProjectionStoreStats();
print('Total states: ${stats?.totalStates}');
print('Last save: ${stats?.lastSaveTime}');

print('Processed events: ${eventProcessor.processedEventCount}');
print('Active projections: ${eventProcessor.projections.length}');

Contributing #

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for your changes
  4. Ensure all tests pass
  5. Create a pull request

License #

MIT License - see LICENSE file for details.

0
likes
150
points
26
downloads

Documentation

Documentation
API reference

Publisher

verified publisherneutrinographics.com

Weekly Downloads

Event Sourcing and CQRS library for Dart applications using the Gossip protocol. Provides core components for building scalable event-driven architectures.

Repository (GitHub)
View/report issues

Topics

#event-sourcing #cqrs #distributed-systems #gossip-protocol #command-query

License

BSD-3-Clause (license)

Dependencies

gossip

More

Packages that depend on gossip_event_sourcing