Mpmc class

MPMC (Multiple-Producer Multiple-Consumer) channels - Maximum flexibility for complex patterns.

The most general channel type supporting concurrent producers AND consumers. Messages are distributed among consumers using competitive consumption - each message is received by exactly one consumer (no broadcasting).

Core Characteristics

  • Multiple producers: Thread-safe concurrent sending
  • Multiple consumers: Competitive message consumption (not broadcast)
  • FIFO ordering: Messages maintain send order across all consumers
  • Load balancing: Work automatically distributed among available consumers
  • Full cloning: Both senders and receivers can be cloned
  • Fair scheduling: Consumers compete fairly for messages

When to Use MPMC

  • Worker pools: Multiple producers ↔ multiple worker consumers
  • Load balancing: Distribute work across multiple processors
  • Pipeline stages: Multi-stage processing with multiple workers per stage
  • Distributed systems: Multiple clients ↔ multiple servers
  • Parallel processing: Fan-out work distribution patterns
  • Resource pools: Multiple requesters ↔ multiple resource providers

Important: Competitive Consumption

Unlike broadcast channels, each message goes to exactly one consumer. This enables load balancing but means consumers compete for messages.

Performance Considerations

  • Coordination overhead: Higher than MPSC due to consumer coordination
  • Good throughput: ~525-937ns per operation
  • Latest-only mode: Very fast for coalescing buffers
  • Memory efficiency: Shared buffer across all consumers

Example

import 'dart:async';

import 'package:cross_channel/mpmc.dart';

Future<void> main() async {
  // 1. Create a bounded MPMC channel for N:M communication
  // Limited work queue prevents memory exhaustion (natural backpressure)
  final (taskTx, taskRx) = Mpmc.bounded<String>(100);

  // 2. Multiple Job Producers
  for (int i = 0; i < 2; i++) {
    final producer = taskTx.clone();
    unawaited(Future.microtask(() async {
      await producer.send('Task from Producer $i');
      producer.close();
    }));
  }
  taskTx.close();

  // 3. Multiple Worker Consumers (Competitive Consumption)
  final futures = <Future<void>>[];
  for (int i = 0; i < 2; i++) {
    final worker = taskRx.clone();
    futures.add(Future.microtask(() async {
      await for (final task in worker.stream()) {
        print('Worker $i processed: $task');
      }
    }));
  }
  taskRx.close();

  await Future.wait(futures);
}

Constructors

Mpmc()

Properties

hashCode int
The hash code for this object.
no setterinherited
runtimeType Type
A representation of the runtime type of the object.
no setterinherited

Methods

noSuchMethod(Invocation invocation) → dynamic
Invoked when a nonexistent method or property is accessed.
inherited
toString() String
A string representation of this object.
inherited

Operators

operator ==(Object other) bool
The equality operator.
inherited

Static Methods

bounded<T>(int capacity, {String? metricsId}) MpmcChannel<T>
channel<T>({int? capacity, DropPolicy policy = DropPolicy.block, OnDrop<T>? onDrop, bool chunked = true, String? metricsId}) MpmcChannel<T>
latest<T>({String? metricsId}) MpmcChannel<T>
unbounded<T>({bool chunked = true, String? metricsId}) MpmcChannel<T>