crdt_socket_sync 0.2.0 copy "crdt_socket_sync: ^0.2.0" to clipboard
crdt_socket_sync: ^0.2.0 copied to clipboard

Conflict-free replicated data type (CRDT) - Socket sync implementation provided in dart

CRDT Socket Sync #

A comprehensive Dart package for synchronizing Conflict-free Replicated Data Types (CRDTs) between multiple clients and a server.

Overview #

CRDT Socket Sync provides a robust, real-time synchronization system that allows multiple clients to collaborate on shared documents without conflicts. Built on top of crdt_lf, this package enables seamless data synchronization with automatic conflict resolution.

Features #

  • 🔄 Real-time Synchronization: Instant propagation of changes across all connected clients
  • 🌐 WebSocket Support: Built-in WebSocket client and server implementations
  • 🔧 Conflict Resolution: Automatic conflict-free merge of concurrent operations
  • 📦 Compression: Optional data compression for efficient network usage
  • 🔌 Modular Architecture: Separate client and server components with clean abstractions
  • 📡 Automatic Reconnection: Robust connection handling with automatic retry logic
  • 🎯 Type Safety: Full Dart type safety with generic document handlers
  • 📊 Event Monitoring: Comprehensive event streams for connection and synchronization monitoring
  • 🔌 Plugins: Extendable plugin system for custom functionality

Built-in Plugins #

  • 📡 Awareness Plugin: Track the awareness of the clients.

Installation #

Add this package to your pubspec.yaml:

dependencies:
  crdt_socket_sync: 
  crdt_lf:

Then run:

dart pub get

Quick Start #

Server Setup #

import 'package:crdt_socket_sync/src/web_socket_server.dart';

void main() async {
  // Create a server registry to manage documents
  final registry = InMemoryServerRegistry();
  
  // Create and start the WebSocket server
  final server = WebSocketServer(
    host: 'localhost',
    port: 8080,
    serverRegistry: registry,
  );
  
  await server.start();
  print('Server started on localhost:8080');
  
  // Listen to server events
  server.serverEvents.listen((event) {
    print('Server event: ${event.type} - ${event.message}');
  });
}

Client Setup #

import 'package:crdt_socket_sync/src/web_socket_client.dart';
import 'package:crdt_lf/crdt_lf.dart';

void main() async {
  // Create a CRDT document
  final document = CRDTDocument.create(peerId: PeerId.generate());
  
  // Register handlers for different data types
  final listHandler = CRDTListHandler<String>(document, 'shared_list');
  document.registerHandler(listHandler);
  
  // Create the client
  final client = WebSocketClient(
    url: 'ws://localhost:8080',
    document: document,
    author: document.peerId,
  );
  
  // Monitor connection status
  client.connectionStatus.listen((status) {
    print('Connection status: $status');
  });
  
  // Connect to server
  final connected = await client.connect();
  if (connected) {
    print('Connected successfully!');
    
    // Make changes to the document
    listHandler.insert(0, 'Hello, World!');
  }
}

Plugins #

The package provides a plugin system that allows you to extend the functionality of the client and the server. A plugin can be only on the client or only on the server or both.

// it's important that `MyClientPlugin` extends `ClientSyncPlugin` (not implements).
// `ClientSyncPlugin` makes some "magic" to make the plugin work.
class MyClientPlugin extends ClientSyncPlugin {
  @override
  void onMessage(Message message) {
    print('message: $message');
  }
}

// it's important that `MyServerPlugin` extends `ServerSyncPlugin` (not implements).
// `ServerSyncPlugin` makes some "magic" to make the plugin work.
class MyServerPlugin extends ServerSyncPlugin {
  @override
  void onMessage(Message message) {
    print('message: $message');
  }
}

final client = WebSocketClient(
  url: 'ws://localhost:8080',
  document: document,
  author: document.peerId,
  plugins: [MyClientPlugin()],
);

final server = WebSocketServer(
  host: 'localhost',
  port: 8080,
  serverRegistry: registry,
  plugins: [MyServerPlugin()],
);

A plugin can send new message types to clients and the server. To do so, it must extend the Message class and implement the fromJson method to decode the messages.

Awareness Plugin

The awareness plugin is a plugin that allows you to track the awareness of the clients. It is a plugin that is both on the client and the server. It is used to track the awareness of the clients and to send the awareness to the server and to the clients.

The example provided uses the awareness plugin to track the active users (name, surname, random color) and their relative position in the document.

awareness_plugin

Examples #

This package provided some examples:

In the example/ directory you can find a complete working example of the server and the client (dart).

In the flutter_example/ directory you can find a complete working example of a flutter app that uses the server and the client.

Try to run the server example and some client applications (or the dart client). The workspace contains a .vscode folder with the launch settings for the examples.

The server example and the flutter example already use the awareness plugin.

sync_server_multi_client

Library Structure #

The package is organized into several modules for different use cases:

Import Options #

For Client Applications

// Basic client interfaces
import 'package:crdt_socket_sync/client.dart';

// WebSocket client implementation
import 'package:crdt_socket_sync/web_socket_client.dart';

For Server Applications

// Basic server interfaces
import 'package:crdt_socket_sync/server.dart';

// WebSocket server implementation
import 'package:crdt_socket_sync/web_socket_server.dart';

Advanced Usage #

Custom Document Registry #

Implement your own document storage backend:

class CustomServerRegistry implements CRDTServerRegistry {
  final Map<String, CRDTDocument> _documents = {};
  
  @override
  CRDTDocument? getDocument(String documentId) {
    return _documents[documentId];
  }
  
  @override
  void setDocument(String documentId, CRDTDocument document) {
    _documents[documentId] = document;
  }
  
  @override
  List<String> get documentIds => _documents.keys.toList();
}

Compression #

Enable data compression for reduced bandwidth usage:

// Server with compression
final server = WebSocketServer(
  host: 'localhost',
  port: 8080,
  serverRegistry: registry,
  compressor: GzipCompressor(), // or your custom compressor
);

// Client with compression
final client = WebSocketClient(
  url: 'ws://localhost:8080',
  document: document,
  author: author,
  compressor: GzipCompressor(),
);

Event Monitoring #

Monitor detailed synchronization events:

// Server events
server.serverEvents.listen((event) {
  switch (event.type) {
    case ServerEventType.clientConnected:
      print('Client ${event.data?['clientId']} connected');
      break;
    case ServerEventType.clientDisconnected:
      print('Client ${event.data?['clientId']} disconnected');
      break;
    case ServerEventType.clientChangeApplied:
      print('Change applied from client');
      break;
  }
});

// Client messages
client.messages.listen((message) {
  switch (message.type) {
    case MessageType.change:
      print('Received change from server');
      break;
    case MessageType.snapshot:
      print('Received snapshot from server');
      break;
  }
});

Protocol Details #

The synchronization protocol includes:

  • Handshake: Initial client-server negotiation with version exchange
  • Changes: Real-time propagation of CRDT operations
  • Snapshots: Full document state synchronization
  • Ping/Pong: Connection health monitoring
  • Error Handling: Graceful error recovery and reporting

Error Handling #

The library provides robust error handling with automatic recovery:

client.connectionStatus.listen((status) {
  switch (status) {
    case ConnectionStatus.connected:
      // Normal operation
      break;
    case ConnectionStatus.reconnecting:
      // Automatic reconnection in progress
      break;
    case ConnectionStatus.error:
      // Handle connection errors
      break;
    case ConnectionStatus.disconnected:
      // Clean disconnection
      break;
  }
});

Packages #

Other bricks of the crdt "system" are:

Roadmap #

A roadmap is available in the project page. The roadmap provides a high-level overview of the project's goals and the current status of the project.

Contributing #

Contributions are welcome! Please read the contributing guidelines and submit pull requests to the main repository.

1
likes
160
points
24
downloads

Publisher

verified publishermattiapispisa.it

Weekly Downloads

Conflict-free replicated data type (CRDT) - Socket sync implementation provided in dart

Homepage
Repository (GitHub)

Topics

#crdt #socket #sync #fugue #hlc

Documentation

API reference

License

MIT (license)

Dependencies

crdt_lf, hlc_dart, web_socket_channel

More

Packages that depend on crdt_socket_sync