Mpmc class
MPMC (Multiple-Producer Multiple-Consumer) channels - Maximum flexibility for complex patterns.
The most general channel type supporting concurrent producers AND consumers. Messages are distributed among consumers using competitive consumption - each message is received by exactly one consumer (no broadcasting).
Core Characteristics
- Multiple producers: Thread-safe concurrent sending
- Multiple consumers: Competitive message consumption (not broadcast)
- FIFO ordering: Messages maintain send order across all consumers
- Load balancing: Work automatically distributed among available consumers
- Full cloning: Both senders and receivers can be cloned
- Fair scheduling: Consumers compete fairly for messages
When to Use MPMC
- Worker pools: Multiple producers ↔ multiple worker consumers
- Load balancing: Distribute work across multiple processors
- Pipeline stages: Multi-stage processing with multiple workers per stage
- Distributed systems: Multiple clients ↔ multiple servers
- Parallel processing: Fan-out work distribution patterns
- Resource pools: Multiple requesters ↔ multiple resource providers
Important: Competitive Consumption
Unlike broadcast channels, each message goes to exactly one consumer. This enables load balancing but means consumers compete for messages.
Performance Considerations
- Coordination overhead: Higher than MPSC due to consumer coordination
- Good throughput: ~525-937ns per operation
- Latest-only mode: Very fast for coalescing buffers
- Memory efficiency: Shared buffer across all consumers
Example
import 'dart:async';
import 'package:cross_channel/mpmc.dart';
Future<void> main() async {
// 1. Create a bounded MPMC channel for N:M communication
// Limited work queue prevents memory exhaustion (natural backpressure)
final (taskTx, taskRx) = Mpmc.bounded<String>(100);
// 2. Multiple Job Producers
for (int i = 0; i < 2; i++) {
final producer = taskTx.clone();
unawaited(Future.microtask(() async {
await producer.send('Task from Producer $i');
producer.close();
}));
}
taskTx.close();
// 3. Multiple Worker Consumers (Competitive Consumption)
final futures = <Future<void>>[];
for (int i = 0; i < 2; i++) {
final worker = taskRx.clone();
futures.add(Future.microtask(() async {
await for (final task in worker.stream()) {
print('Worker $i processed: $task');
}
}));
}
taskRx.close();
await Future.wait(futures);
}
Constructors
- Mpmc()
Properties
- hashCode → int
-
The hash code for this object.
no setterinherited
- runtimeType → Type
-
A representation of the runtime type of the object.
no setterinherited
Methods
-
noSuchMethod(
Invocation invocation) → dynamic -
Invoked when a nonexistent method or property is accessed.
inherited
-
toString(
) → String -
A string representation of this object.
inherited
Operators
-
operator ==(
Object other) → bool -
The equality operator.
inherited
Static Methods
-
bounded<
T> (int capacity, {String? metricsId}) → MpmcChannel< T> -
channel<
T> ({int? capacity, DropPolicy policy = DropPolicy.block, OnDrop< T> ? onDrop, bool chunked = true, String? metricsId}) → MpmcChannel<T> -
latest<
T> ({String? metricsId}) → MpmcChannel< T> -
unbounded<
T> ({bool chunked = true, String? metricsId}) → MpmcChannel< T>