dactor 1.1.0 copy "dactor: ^1.1.0" to clipboard
dactor: ^1.1.0 copied to clipboard

An actor system for Dart

Dactor - Actor System for Dart #

A lightweight, high-performance actor model implementation for Dart

Dactor provides a robust actor system for building concurrent, fault-tolerant, and scalable applications in Dart. It implements the actor model with features like supervision trees, message passing, ask patterns with reliability, metrics, and pooling.

Pub Version Dart SDK License

โœจ Features #

  • ๐ŸŽฏ Pure Actor Model: Type-safe message passing with actor isolation
  • ๐Ÿ”„ Reliable Ask Pattern: Request-response with configurable retries and exponential backoff
  • ๐Ÿ›ก๏ธ Fault Tolerance: Supervision trees with customizable recovery strategies
  • ๐Ÿ“ก Event Bus: Publish-subscribe messaging for event-driven architectures
  • โฐ Actor Timers: Schedule messages with single-shot, fixed-delay, and fixed-rate timers
  • โšก High Performance: >29K messages/sec throughput, <1ms latency
  • ๐Ÿ“Š Built-in Observability: Comprehensive metrics, tracing, and monitoring
  • ๐ŸŽ›๏ธ Actor Pooling: Scale with worker pools and round-robin routing
  • ๐Ÿ’พ Memory Efficient: <1KB overhead per actor
  • ๐Ÿ” Dead Letter Queue: Handle undeliverable messages gracefully
  • ๐Ÿš€ Zero Dependencies: Pure Dart implementation

๐Ÿš€ Quick Start #

Add to your pubspec.yaml:

dependencies:
  dactor: ^1.0.0

Basic Example #

import 'package:dactor/dactor.dart';

// Define an actor
class CounterActor extends Actor {
  int _count = 0;

  @override
  Future<void> onMessage(dynamic message) async {
    if (message is LocalMessage) {
      switch (message.payload) {
        case 'increment':
          _count++;
          print('Count: $_count');
          message.sender?.tell(LocalMessage(payload: _count));
          break;
        case 'get':
          message.sender?.tell(LocalMessage(payload: _count));
          break;
      }
    }
  }

  @override
  void preStart() => print('Counter actor started');

  @override
  void postStop() => print('Counter actor stopped');
}

void main() async {
  // Create actor system
  final system = LocalActorSystem();
  
  // Spawn an actor
  final counter = await system.spawn('counter', () => CounterActor());
  
  // Send messages (fire-and-forget)
  counter.tell(LocalMessage(payload: 'increment'));
  counter.tell(LocalMessage(payload: 'increment'));
  
  // Ask pattern (request-response)
  final count = await counter.ask(
    LocalMessage(payload: 'get'),
    Duration(seconds: 1),
  );
  print('Current count: ${(count as LocalMessage).payload}');
  
  await system.shutdown();
}

๐Ÿ“š Core Concepts #

Actor Lifecycle #

Actors have a well-defined lifecycle managed by the system:

class MyActor extends Actor {
  @override
  void preStart() {
    // Called when actor starts
    print('Actor ${context.self.id} starting');
  }

  @override
  Future<void> onMessage(dynamic message) async {
    // Handle incoming messages
    print('Received: $message');
  }

  @override
  void postStop() {
    // Called when actor stops (cleanup)
    print('Actor ${context.self.id} stopped');
  }
}

Message Passing #

Messages are the primary communication mechanism:

// Simple message
counter.tell(LocalMessage(payload: 'increment'));

// Message with sender - IMPORTANT: Pass sender as separate parameter
counter.tell(LocalMessage(payload: 'ping'), sender: otherActor);

// Custom message types
class CustomMessage implements Message {
  final String data;
  CustomMessage(this.data);
  
  // Implement Message interface
  @override
  String get correlationId => 'custom';
  @override
  Map<String, dynamic> get metadata => {};
  @override
  ActorRef? get replyTo => null;
  @override
  DateTime get timestamp => DateTime.now();
}

โš ๏ธ Critical: Proper Sender Passing

When sending messages that expect a reply, always pass the sender as a separate parameter to the tell() method:

// โœ… CORRECT: Sender passed as separate parameter
actor.tell(LocalMessage(payload: 'request'), sender: probe.ref);

// โŒ INCORRECT: Trying to embed sender in message
actor.tell(LocalMessage(payload: 'request', sender: probe.ref)); // This won't work!

The actor system automatically sets context.sender when processing messages, but only when the sender is passed correctly as a parameter to tell().

๐ŸŽฏ Ask Pattern with Reliability #

The ask pattern provides reliable request-response messaging with configurable retries:

Basic Ask Pattern #

// Simple ask with timeout
final response = await actor.ask('ping', Duration(seconds: 1));

// With custom message
final result = await actor.ask(
  LocalMessage(payload: 'get_data'),
  Duration(seconds: 5),
);

Configurable Reliability #

// Development configuration (longer timeouts, more retries)
final system = ActorSystem.create(ActorSystemConfig(
  askConfig: AskConfig.development(),
));

// Production configuration (shorter timeouts, fewer retries)
final system = ActorSystem.create(ActorSystemConfig(
  askConfig: AskConfig.production(),
));

// Custom configuration
final system = ActorSystem.create(ActorSystemConfig(
  askConfig: AskConfig(
    defaultTimeout: Duration(seconds: 3),
    maxRetries: 2,
    retryBackoffBase: Duration(milliseconds: 100),
    retryBackoffMultiplier: 2.0,
    maxBackoffDuration: Duration(seconds: 5),
  ),
));

// Disable retries entirely
final system = ActorSystem.create(ActorSystemConfig(
  askConfig: AskConfig.noRetries(),
));

Exponential Backoff #

The system automatically calculates backoff delays:

final config = AskConfig(
  retryBackoffBase: Duration(milliseconds: 100),
  retryBackoffMultiplier: 2.0,
);

// Retry attempts will wait:
// 1st retry: 100ms
// 2nd retry: 200ms  
// 3rd retry: 400ms
// 4th retry: 800ms (and so on...)

๐Ÿ›ก๏ธ Supervision and Fault Tolerance #

Actors can supervise children and handle failures gracefully:

class WorkerActor extends Actor {
  @override
  Future<void> onMessage(dynamic message) async {
    if (message == 'fail') {
      throw Exception('Simulated failure');
    }
    // Handle other messages...
  }
}

class MySupervisor extends SupervisorActor {
  MySupervisor() : super(
    AllForOneStrategy(
      decider: (error, stackTrace) => SupervisionDecision.restart,
      maxRetries: 3,
    ),
  );

  @override
  Future<void> onMessage(dynamic message) async {
    if (message == 'create_worker') {
      await supervise('worker', () => WorkerActor());
    }
  }
}

// Usage
final supervisor = await system.spawn('supervisor', () => MySupervisor());
supervisor.tell('create_worker');

Supervision Strategies #

OneForOneStrategy: Only the failed actor is affected

final strategy = OneForOneStrategy(
  decider: (error, stackTrace) => SupervisionDecision.restart,
  maxRetries: 3,
);

AllForOneStrategy: All supervised actors are affected by one failure

final strategy = AllForOneStrategy(
  decider: (error, stackTrace) => SupervisionDecision.restart,
  maxRetries: 1,
);

Supervision Decisions:

  • SupervisionDecision.restart - Restart the failed actor
  • SupervisionDecision.stop - Stop the failed actor permanently
  • SupervisionDecision.escalate - Pass the failure up to the parent

โšก Actor Pooling for Scalability #

Scale your actors with worker pools:

// Create a pool of 4 worker actors
final router = await system.spawn(
  'worker-router',
  () => WorkerActor(),
  pool: Pool(workerCount: 4),
);

// Messages are distributed round-robin to workers
for (int i = 0; i < 100; i++) {
  router.tell(LocalMessage(payload: 'task_$i'));
}

โฐ Actor Timers for Scheduled Messages #

Dactor provides Akka-style Timer actors that allow you to schedule messages to be sent to an actor at specific times or intervals. Timers are automatically bound to the actor's lifecycle and are cancelled when the actor is stopped or restarted.

Timer Types #

Single-Shot Timers: Send a message once after a delay

class TimeoutActor extends Actor {
  @override
  void preStart() {
    // Send timeout message after 30 seconds
    context.timers.startSingleTimer(
      'timeout', 
      'session-expired', 
      Duration(seconds: 30)
    );
  }
  
  @override
  Future<void> onMessage(dynamic message) async {
    if (message == 'activity') {
      // Reset timeout on activity
      context.timers.startSingleTimer(
        'timeout', 
        'session-expired', 
        Duration(seconds: 30)
      );
    } else if (message == 'session-expired') {
      print('Session expired due to inactivity');
      context.system.stop(context.self);
    }
  }
}

Fixed Delay Timers: Send messages repeatedly with consistent spacing

class HeartbeatActor extends Actor {
  @override
  void preStart() {
    // Send heartbeat every 30 seconds with fixed delay
    context.timers.startTimerWithFixedDelay(
      'heartbeat', 
      'send-heartbeat', 
      Duration(seconds: 30)
    );
  }
  
  @override
  Future<void> onMessage(dynamic message) async {
    if (message == 'send-heartbeat') {
      await sendHeartbeatToServer();
      print('Heartbeat sent');
    }
  }
  
  Future<void> sendHeartbeatToServer() async {
    // Implementation for sending heartbeat
  }
}

Fixed Rate Timers: Send messages at exact intervals with catch-up behavior

class MetricsCollectorActor extends Actor {
  @override
  void preStart() {
    // Collect metrics every minute at fixed rate
    context.timers.startTimerAtFixedRate(
      'collect', 
      'collect-metrics', 
      Duration(minutes: 1)
    );
  }
  
  @override
  Future<void> onMessage(dynamic message) async {
    if (message == 'collect-metrics') {
      final metrics = await collectSystemMetrics();
      await storeMetrics(metrics);
    }
  }
}

Timer Management #

Key-Based Timer Replacement

class RequestTimeoutActor extends Actor {
  @override
  Future<void> onMessage(dynamic message) async {
    if (message is StartRequest) {
      // Set timeout for this request (replaces any existing timeout)
      context.timers.startSingleTimer(
        'request-timeout',
        RequestTimeout(message.requestId),
        Duration(seconds: 5)
      );
    } else if (message is RequestCompleted) {
      // Cancel timeout as request completed successfully
      context.timers.cancel('request-timeout');
    } else if (message is RequestTimeout) {
      print('Request ${message.requestId} timed out');
    }
  }
}

Timer Lifecycle Management

class CacheManagerActor extends Actor {
  @override
  void preStart() {
    // Start periodic cleanup
    context.timers.startTimerWithFixedDelay(
      'cleanup', 'cleanup-expired', Duration(minutes: 5));
    
    // Start periodic stats collection
    context.timers.startTimerAtFixedRate(
      'stats', 'collect-stats', Duration(minutes: 1));
  }
  
  @override
  Future<void> onMessage(dynamic message) async {
    switch (message) {
      case 'cleanup-expired':
        await cleanupExpiredEntries();
        break;
      case 'collect-stats':
        await collectCacheStats();
        break;
      case 'shutdown':
        // Cancel all timers before shutdown
        context.timers.cancelAll();
        break;
    }
  }
  
  @override
  void postStop() {
    // Timers are automatically cancelled when actor stops
    print('Cache manager stopped - all timers cancelled');
  }
}

Timer API Reference #

// Start timers
context.timers.startSingleTimer(key, message, delay);
context.timers.startTimerWithFixedDelay(key, message, delay);
context.timers.startTimerAtFixedRate(key, message, interval);

// Manage timers  
context.timers.cancel(key);           // Cancel specific timer
context.timers.cancelAll();           // Cancel all timers
context.timers.isTimerActive(key);    // Check if timer is active
context.timers.activeTimers;          // List all active timer keys

Timer Features #

  • Automatic Cleanup: Timers are automatically cancelled when actors stop or restart
  • Key-Based Replacement: Starting a timer with an existing key cancels the previous timer
  • Lifecycle Bound: Timer messages are guaranteed not to be received after actor termination
  • High Performance: Built on Dart's efficient Timer implementation
  • Memory Safe: No memory leaks from cancelled or completed timers

Common Timer Patterns #

Session Management

class SessionActor extends Actor {
  @override
  void preStart() {
    scheduleExpiration();
  }
  
  void scheduleExpiration() {
    context.timers.startSingleTimer(
      'expire', 'expire-session', Duration(minutes: 30));
  }
  
  @override
  Future<void> onMessage(dynamic message) async {
    switch (message) {
      case UserActivity():
        scheduleExpiration(); // Reset expiration timer
        break;
      case 'expire-session':
        print('Session expired');
        context.system.stop(context.self);
        break;
    }
  }
}

Rate Limiting

class RateLimiterActor extends Actor {
  int _requestCount = 0;
  
  @override
  void preStart() {
    // Reset counter every minute
    context.timers.startTimerAtFixedRate(
      'reset', 'reset-counter', Duration(minutes: 1));
  }
  
  @override
  Future<void> onMessage(dynamic message) async {
    if (message == 'request') {
      if (_requestCount < 100) {
        _requestCount++;
        context.sender?.tell(LocalMessage(payload: 'allowed'));
      } else {
        context.sender?.tell(LocalMessage(payload: 'rate-limited'));
      }
    } else if (message == 'reset-counter') {
      _requestCount = 0;
    }
  }
}

๐Ÿ“ก Event Bus for Event-Driven Architecture #

The event bus enables publish-subscribe messaging patterns for building event-driven applications:

Basic Event Publishing and Subscription #

// Define event types
class OrderCreated {
  final String orderId;
  final DateTime timestamp;
  final double amount;
  
  OrderCreated(this.orderId, this.timestamp, this.amount);
}

class PaymentProcessed {
  final String orderId;
  final double amount;
  
  PaymentProcessed(this.orderId, this.amount);
}

// Publisher actor
class OrderService extends Actor {
  @override
  Future<void> onMessage(dynamic message) async {
    if (message is LocalMessage && message.payload == 'create_order') {
      final orderId = 'ORDER-${DateTime.now().millisecondsSinceEpoch}';
      
      // Publish event to all subscribers
      context.publish(OrderCreated(orderId, DateTime.now(), 99.99));
    }
  }
}

// Subscriber actor
class PaymentService extends Actor {
  @override
  void preStart() {
    // Subscribe to order events
    context.subscribe<OrderCreated>();
  }

  @override
  Future<void> onMessage(dynamic message) async {
    if (message is OrderCreated) {
      print('Processing payment for order: ${message.orderId}');
      
      // Process payment and publish completion event
      context.publish(PaymentProcessed(message.orderId, message.amount));
    }
  }
}

Event Bus API #

// Subscribe to events (typically in preStart)
context.subscribe<OrderCreated>();

// Publish events
context.publish(OrderCreated('order-123', DateTime.now(), 99.99));

// Unsubscribe from events
context.unsubscribe<OrderCreated>();

// Direct event bus access
system.eventBus.subscribe<OrderCreated>(actorRef);
system.eventBus.publish(OrderCreated('order-123', DateTime.now(), 99.99));
system.eventBus.unsubscribe<OrderCreated>(actorRef);

Event-Driven Microservices #

class OrderProcessingSystem {
  late ActorSystem system;
  
  Future<void> start() async {
    system = ActorSystem.create();
    
    // Spawn services that communicate via events
    await system.spawn('order-service', () => OrderService());
    await system.spawn('payment-service', () => PaymentService());
    await system.spawn('shipping-service', () => ShippingService());
    await system.spawn('notification-service', () => NotificationService());
  }
}

class ShippingService extends Actor {
  @override
  void preStart() {
    context.subscribe<PaymentProcessed>();
  }

  @override
  Future<void> onMessage(dynamic message) async {
    if (message is PaymentProcessed) {
      print('Shipping order: ${message.orderId}');
      context.publish(OrderShipped(message.orderId, 'TRACK-123'));
    }
  }
}

class NotificationService extends Actor {
  @override
  void preStart() {
    // Subscribe to all order events
    context.subscribe<OrderCreated>();
    context.subscribe<PaymentProcessed>();
    context.subscribe<OrderShipped>();
  }

  @override
  Future<void> onMessage(dynamic message) async {
    switch (message.runtimeType) {
      case OrderCreated:
        print('๐Ÿ“ง Order confirmation sent');
        break;
      case PaymentProcessed:
        print('๐Ÿ“ง Payment confirmation sent');
        break;
      case OrderShipped:
        print('๐Ÿ“ง Shipping notification sent');
        break;
    }
  }
}

Saga Pattern Implementation #

class OrderSaga extends Actor {
  final Map<String, SagaState> _sagas = {};

  @override
  void preStart() {
    context.subscribe<OrderCreated>();
    context.subscribe<PaymentProcessed>();
    context.subscribe<PaymentFailed>();
    context.subscribe<OrderShipped>();
  }

  @override
  Future<void> onMessage(dynamic message) async {
    switch (message.runtimeType) {
      case OrderCreated:
        final event = message as OrderCreated;
        _sagas[event.orderId] = SagaState.orderCreated;
        // Trigger payment processing
        context.publish(ProcessPayment(event.orderId, event.amount));
        break;
        
      case PaymentProcessed:
        final event = message as PaymentProcessed;
        if (_sagas[event.orderId] == SagaState.orderCreated) {
          _sagas[event.orderId] = SagaState.paymentProcessed;
          // Trigger shipping
          context.publish(ShipOrder(event.orderId));
        }
        break;
        
      case PaymentFailed:
        final event = message as PaymentFailed;
        // Compensate: cancel order
        context.publish(CancelOrder(event.orderId));
        _sagas.remove(event.orderId);
        break;
        
      case OrderShipped:
        final event = message as OrderShipped;
        // Saga completed successfully
        _sagas.remove(event.orderId);
        break;
    }
  }
}

enum SagaState { orderCreated, paymentProcessed, shipped }

Event Bus Monitoring #

// Monitor event bus activity
final eventStream = system.events;
eventStream.listen((event) {
  print('Event bus activity: $event');
});

// Check event bus metrics
print('Active subscribers: ${system.eventBus.subscriberCount}');
print('Total subscriptions: ${system.eventBus.subscriptionCount}');

Event Bus Features #

  • Type-Safe: Events are strongly typed using Dart's type system
  • Automatic Cleanup: Subscriptions are automatically removed when actors stop
  • High Performance: Efficient O(1) routing for direct type matches
  • Memory Safe: No memory leaks from orphaned subscriptions
  • Observable: Built-in monitoring stream for debugging and metrics

๐Ÿ“Š Metrics and Observability #

Built-in metrics for monitoring your actor system:

// Create system with metrics
final metrics = InMemoryMetricsCollector();
final system = ActorSystem.create(ActorSystemConfig(
  metricsCollector: metrics,
));

// Spawn actors and send messages...
final actor = await system.spawn('test', () => MyActor());
actor.tell('hello');

// Check metrics
print('Actors spawned: ${metrics.getCounter('actors.spawned')}');
print('Active actors: ${metrics.getGauge('actors.active')}');
print('Messages processed: ${metrics.getCounter('messages.processed')}');
print('Processing times: ${metrics.getTimings('messages.processing_time')}');

Available metrics:

  • actors.spawned - Total actors created
  • actors.active - Current active actors
  • actors.stopped - Total actors stopped
  • actors.failed - Total actor failures
  • messages.processed - Total messages processed
  • messages.processing_time - Message processing latencies
  • dead_letters - Undeliverable messages
  • mailbox.size - Current mailbox sizes

๐Ÿ” Dead Letter Queue #

Handle undeliverable messages:

final actor = await system.spawn('test', () => MyActor());
await system.stop(actor);

// This message will go to dead letters
actor.tell('late_message');

// Check dead letter queue
final deadLetter = system.deadLetterQueue.dequeue();
if (deadLetter != null) {
  print('Undelivered message: ${deadLetter.message}');
  print('Intended recipient: ${deadLetter.recipient}');
}

๐ŸŽฎ Real-World Examples #

Chat Server #

class ChatRoom extends Actor {
  final Set<ActorRef> _participants = {};

  @override
  Future<void> onMessage(dynamic message) async {
    if (message is JoinRoom) {
      _participants.add(message.user);
      message.user.tell('Welcome to the chat!');
    } else if (message is ChatMessage) {
      // Broadcast to all participants
      for (final participant in _participants) {
        participant.tell('${message.sender}: ${message.text}');
      }
    } else if (message is LeaveRoom) {
      _participants.remove(message.user);
    }
  }
}

class JoinRoom {
  final ActorRef user;
  JoinRoom(this.user);
}

class ChatMessage {
  final String sender;
  final String text;
  ChatMessage(this.sender, this.text);
}

Game Entity System #

class GameEntity extends Actor {
  double x = 0, y = 0;
  int health = 100;

  @override
  Future<void> onMessage(dynamic message) async {
    switch (message.runtimeType) {
      case MoveCommand:
        final move = message as MoveCommand;
        x += move.deltaX;
        y += move.deltaY;
        // Notify other systems
        context.system.eventBus.publish(EntityMoved(context.self, x, y));
        break;
        
      case DamageCommand:
        final damage = message as DamageCommand;
        health -= damage.amount;
        if (health <= 0) {
          context.system.eventBus.publish(EntityDestroyed(context.self));
          context.stop();
        }
        break;
    }
  }
}

Microservice Coordination #

class OrderService extends Actor {
  @override
  Future<void> onMessage(dynamic message) async {
    if (message is ProcessOrder) {
      try {
        // Validate order
        final validation = await context.system
            .actorOf('validation-service')
            .ask(ValidateOrder(message.order), Duration(seconds: 5));
            
        if (validation.isValid) {
          // Process payment
          final payment = await context.system
              .actorOf('payment-service')
              .ask(ProcessPayment(message.order), Duration(seconds: 10));
              
          if (payment.successful) {
            // Ship order
            context.system
                .actorOf('shipping-service')
                .tell(ShipOrder(message.order));
          }
        }
      } catch (e) {
        // Handle service failures
        context.system
            .actorOf('notification-service')
            .tell(OrderFailed(message.order, e.toString()));
      }
    }
  }
}

โš™๏ธ Configuration #

Actor System Configuration #

final config = ActorSystemConfig(
  // Ask pattern configuration
  askConfig: AskConfig(
    defaultTimeout: Duration(seconds: 5),
    maxRetries: 3,
    retryBackoffBase: Duration(milliseconds: 100),
  ),
  
  // Metrics collection
  metricsCollector: InMemoryMetricsCollector(),
  
  // Dispatcher configuration
  dispatcherConfig: DispatcherConfig(
    corePoolSize: 4,
    maximumPoolSize: 8,
  ),
);

final system = ActorSystem.create(config);

๐Ÿš€ Performance #

Dactor is designed for high performance:

  • Throughput: >29,000 messages/second
  • Latency: <112ฮผs average message processing
  • Memory: <1KB overhead per idle actor
  • Scalability: Supports thousands of concurrent actors

Run the benchmarks:

dart test test/benchmark/

Example benchmark results:

Single Actor - Processed 100000 messages in 3401ms
Single Actor - Throughput: 29403.69 msg/sec

Pooled Actor - Processed 100000 messages in 1250ms  
Pooled Actor - Throughput: 80000.00 msg/sec

๐Ÿงช Testing Your Actors #

Dactor includes a dedicated testing toolkit to help you write reliable tests for your actors. The primary tools are TestActorSystem and TestProbe.

TestActorSystem #

For testing, you should use TestActorSystem instead of LocalActorSystem. It provides helper methods for creating testing utilities like probes.

import 'package:dactor/dactor.dart';
import 'package:test/test.dart';

void main() {
  group('My Actor Tests', () {
    late TestActorSystem system;
    
    setUp(() {
      // Use TestActorSystem for your tests
      system = TestActorSystem();
    });
    
    tearDown(() async {
      await system.shutdown();
    });

    // ... your tests
  });
}

TestProbe: Your Testing Companion #

A TestProbe is a special actor that you can use to send messages to your actors and assert the replies. It acts as a "black box" test double with enhanced capabilities for robust testing.

Enhanced TestProbe API

The TestProbe provides several methods for testing actor interactions:

// Create a probe
final probe = await system.createProbe();

// Expect a specific message
await probe.expectMsg('expected_payload');

// Expect a message with timeout
await probe.expectMsg('expected_payload', timeout: Duration(seconds: 5));

// Expect a message of specific type
final msg = await probe.expectMsgType<String>();
final msg = await probe.expectMsgType<MyCustomType>(timeout: Duration(seconds: 3));

// Access the last received message
final lastMsg = probe.lastMessage;

// Reply to the sender of the last message
probe.reply('response_payload');

Expecting Messages

You can use a probe to verify that your actor sends an expected message.

class MyActor extends Actor {
  @override
  Future<void> onMessage(dynamic message) async {
    if (message == 'ping') {
      context.sender?.tell(LocalMessage(payload: 'pong'));
    }
  }
}

test('should respond with pong', () async {
  // 1. Create a probe
  final probe = await system.createProbe();
  
  // 2. Spawn the actor under test
  final actor = await system.spawn('my-actor', () => MyActor());
  
  // 3. Send a message from the probe to the actor (CORRECT sender passing)
  actor.tell(LocalMessage(payload: 'ping'), sender: probe.ref);
  
  // 4. Assert that the probe receives the expected reply
  await probe.expectMsg('pong');
});

Type-Safe Message Expectations

Use expectMsgType<T>() for type-safe message assertions:

test('should receive typed message', () async {
  final probe = await system.createProbe();
  final actor = await system.spawn('test', () => MyActor());
  
  actor.tell(LocalMessage(payload: 'get_number'), sender: probe.ref);
  
  // Expect a message of specific type and get it back
  final number = await probe.expectMsgType<int>(timeout: Duration(seconds: 2));
  expect(number, greaterThan(0));
});

Replying from a Probe

A probe can also reply to messages, allowing you to test more complex interactions.

class AskerActor extends Actor {
  @override
  Future<void> onMessage(dynamic message) async {
    if (message == 'ask_and_reply') {
      final response = await context.sender?.ask(LocalMessage(payload: 'question'));
      context.sender?.tell(LocalMessage(payload: 'response: ${response?.payload}'));
    }
  }
}

test('should handle replies from probe', () async {
  final probe = await system.createProbe();
  final actor = await system.spawn('asker', () => AskerActor());

  actor.tell(LocalMessage(payload: 'ask_and_reply'), sender: probe.ref);

  // Expect the question from the actor
  await probe.expectMsg('question');
  
  // Reply from the probe
  probe.reply('answer');

  // Expect the final response
  await probe.expectMsg('response: answer');
});

Testing Best Practices

  1. Always pass sender correctly: Use actor.tell(message, sender: probe.ref) not actor.tell(LocalMessage(payload: data, sender: probe.ref))

  2. Use timeouts for reliability: Always specify timeouts for expectMsg and expectMsgType in production tests

  3. Leverage type safety: Use expectMsgType<T>() when you know the expected message type

  4. Access last message: Use probe.lastMessage to inspect the most recent message for detailed assertions

Tracing Message Flow #

For more complex scenarios, you can trace the entire flow of a message through the system. This is useful for debugging and ensuring messages are processed correctly.

To enable tracing, configure your TestActorSystem with an InMemoryTraceCollector.

import 'package:dactor/src/tracing/tracing.dart';

class HelloActor extends Actor {
  @override
  Future<void> onMessage(dynamic message) async {
    if (message.payload.toString() == 'hello') {
      context.sender?.tell(LocalMessage(payload: 'world'));
    }
  }
}

void main() {
  group('Message Tracing', () {
    late TestActorSystem system;
    late InMemoryTraceCollector collector;

    setUp(() {
      // 1. Create a collector
      collector = InMemoryTraceCollector();
      // 2. Create the system with the collector
      system = TestActorSystem(ActorSystemConfig(traceCollector: collector));
    });

    tearDown(() async => await system.shutdown());

    test('should trace message flow', () async {
      final probe = await system.createProbe();
      final actor = await system.spawn('test', () => HelloActor());

      // Send a message with the probe as the sender
      final message = LocalMessage(payload: 'hello', sender: probe.ref);
      actor.tell(message);

      // Wait for messages to be processed
      await probe.expectMsg('world');

      // 3. Inspect the trace
      final trace = collector.traces[message.correlationId];
      expect(trace, isNotNull);
      expect(trace!.length, 2);
      expect(trace[0].event, 'sent');
      expect(trace[1].event, 'processed');
    });
  });
}

The InMemoryTraceCollector stores a list of TraceEvent objects for each correlationId. Each event records what happened to the message (e.g., sent, processed, replied) and which actor was involved.

๐ŸŽฏ Best Practices #

1. Design for Immutability #

// Good: Immutable message
class OrderCreated {
  final String orderId;
  final DateTime timestamp;
  final List<String> items;
  
  const OrderCreated(this.orderId, this.timestamp, this.items);
}

// Avoid: Mutable state in messages
class BadMessage {
  String data; // Mutable field
  BadMessage(this.data);
}

2. Use Supervision Hierarchies #

// Create supervision trees for fault isolation
final supervisor = await system.spawn('app-supervisor', () => AppSupervisor());
final dbSupervisor = await supervisor.supervise('db-supervisor', () => DbSupervisor());
final worker = await dbSupervisor.supervise('db-worker', () => DbWorker());

3. Configure Ask Pattern Appropriately #

// Use development config during development
final devSystem = ActorSystem.create(ActorSystemConfig(
  askConfig: AskConfig.development(), // 30s timeout, 5 retries
));

// Use production config in production
final prodSystem = ActorSystem.create(ActorSystemConfig(
  askConfig: AskConfig.production(), // 3s timeout, 2 retries
));

4. Monitor with Metrics #

// Always enable metrics in production
final system = ActorSystem.create(ActorSystemConfig(
  metricsCollector: InMemoryMetricsCollector(),
));

// Regularly check system health
Timer.periodic(Duration(minutes: 1), (_) {
  final activeActors = metrics.getGauge('actors.active');
  final failedActors = metrics.getCounter('actors.failed');
  print('System health: $activeActors active, $failedActors failed');
});

๐Ÿ”ง Advanced Features #

Custom Message Types #

abstract class GameMessage implements Message {
  @override
  String get correlationId => 'game-${DateTime.now().millisecondsSinceEpoch}';
  
  @override
  Map<String, dynamic> get metadata => {'game': true};
  
  @override
  ActorRef? get replyTo => null;
  
  @override
  DateTime get timestamp => DateTime.now();
}

class PlayerMove extends GameMessage {
  final String playerId;
  final double x, y;
  PlayerMove(this.playerId, this.x, this.y);
}

Actor Context Usage #

class ContextActor extends Actor {
  @override
  Future<void> onMessage(dynamic message) async {
    // Access actor context
    print('My ID: ${context.self.id}');
    print('Parent: ${context.parent?.id}');
    print('System: ${context.system}');
    
    // Create child actors
    final child = await context.actorOf('child', () => ChildActor());
    
    // Stop child actors
    await context.stop(child);
    
    // Access children
    print('Children: ${context.children.map((c) => c.id)}');
  }
}

๐Ÿ“– API Reference #

Core Classes #

  • Actor - Base class for all actors
  • ActorRef - Reference to an actor for sending messages
  • ActorSystem - Manages actor lifecycle and messaging
  • EventBus - Manages event subscriptions and publishing
  • Message - Interface for messages passed between actors
  • SupervisorActor - Base class for supervising other actors
  • TimerScheduler - Manages scheduled message delivery within actors

Key Methods #

Actor System:

  • system.spawn(id, factory) - Create a new actor
  • actor.tell(message) - Send fire-and-forget message
  • actor.ask(message, timeout) - Send request-response message
  • system.stop(actor) - Stop an actor gracefully
  • system.shutdown() - Shutdown the entire system

Event Bus:

  • context.publish<T>(event) - Publish an event to all subscribers
  • context.subscribe<T>() - Subscribe to events of type T
  • context.unsubscribe<T>() - Unsubscribe from events of type T
  • system.eventBus.publish<T>(event) - Direct event bus publishing
  • system.eventBus.subscribe<T>(actor) - Direct event bus subscription
  • system.events - Stream of event bus monitoring events

Timer Scheduler:

  • context.timers.startSingleTimer(key, message, delay) - Schedule single message
  • context.timers.startTimerWithFixedDelay(key, message, delay) - Schedule with fixed delay
  • context.timers.startTimerAtFixedRate(key, message, interval) - Schedule at fixed rate
  • context.timers.cancel(key) - Cancel specific timer
  • context.timers.cancelAll() - Cancel all timers
  • context.timers.isTimerActive(key) - Check if timer is active

๐Ÿค Contributing #

We welcome contributions! Please see our Contributing Guide for details.

๐Ÿ“„ License #

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿ™ Acknowledgments #

  • Inspired by the Akka actor system
  • Built for the Dart ecosystem
  • Designed for OverNode's distributed architecture

Ready to build fault-tolerant, concurrent applications?

dart pub add dactor

Start building with actors today! ๐Ÿš€

1
likes
130
points
25
downloads

Publisher

unverified uploader

Weekly Downloads

An actor system for Dart

Repository (GitHub)
View/report issues

Documentation

API reference

License

MIT (license)

Dependencies

cbor, meta, test, uuid

More

Packages that depend on dactor